/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.jobgraph.JobID;

public class InboundEnvelopeDecoder
extends ChannelInboundHandlerAdapter
implements BufferAvailabilityListener {
    private static final Log LOG = LogFactory.getLog(InboundEnvelopeDecoder.class);
    private final BufferProviderBroker bufferProviderBroker;
    private final BufferAvailabilityChangedTask bufferAvailabilityChangedTask = new BufferAvailabilityChangedTask();
    private final ConcurrentLinkedQueue<Buffer> bufferBroker = new ConcurrentLinkedQueue();
    private final ByteBuffer headerBuffer;
    private Envelope currentEnvelope;
    private ByteBuffer currentEventsBuffer;
    private ByteBuffer currentDataBuffer;
    private int currentBufferRequestSize;
    private BufferProvider currentBufferProvider;
    private JobID lastJobId;
    private ChannelID lastSourceId;
    private ByteBuf stagedBuffer;
    private ChannelHandlerContext channelHandlerContext;
    private int bytesToSkip;

    public InboundEnvelopeDecoder(BufferProviderBroker bufferProviderBroker) {
        this.bufferProviderBroker = bufferProviderBroker;
        this.headerBuffer = ByteBuffer.allocateDirect(48);
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.channelHandlerContext == null) {
            this.channelHandlerContext = ctx;
        }
        super.channelActive(ctx);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.stagedBuffer != null) {
            throw new IllegalStateException("No channel read event should be fired as long as the a buffer is staged.");
        }
        ByteBuf in = (ByteBuf)msg;
        if (this.bytesToSkip > 0) {
            this.bytesToSkip = this.skipBytes(in, this.bytesToSkip);
            if (this.bytesToSkip > 0) {
                in.release();
                return;
            }
        }
        this.decodeBuffer(in, ctx);
    }

    private boolean decodeBuffer(ByteBuf in, ChannelHandlerContext ctx) throws IOException {
        DecoderState decoderState;
        while ((decoderState = this.decodeEnvelope(in)) != DecoderState.PENDING) {
            if (decoderState == DecoderState.COMPLETE) {
                ctx.fireChannelRead((Object)this.currentEnvelope);
                this.currentEnvelope = null;
                continue;
            }
            if (decoderState != DecoderState.NO_BUFFER_AVAILABLE) continue;
            switch (this.currentBufferProvider.registerBufferAvailabilityListener(this)) {
                case SUCCEEDED_REGISTERED: {
                    if (ctx.channel().config().isAutoRead()) {
                        ctx.channel().config().setAutoRead(false);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)String.format("Set channel %s auto read to false.", ctx.channel()));
                        }
                    }
                    this.stagedBuffer = in;
                    this.stagedBuffer.retain();
                    return false;
                }
                case FAILED_BUFFER_AVAILABLE: {
                    break;
                }
                case FAILED_BUFFER_POOL_DESTROYED: {
                    this.bytesToSkip = this.skipBytes(in, this.currentBufferRequestSize);
                    this.currentBufferRequestSize = 0;
                    this.currentEventsBuffer = null;
                    this.currentEnvelope = null;
                }
            }
        }
        if (in.isReadable()) {
            throw new IllegalStateException("Every buffer should have been fullyconsumed after *successfully* decoding it (if it was not successful, the buffer will be staged for later consumption).");
        }
        in.release();
        return true;
    }

    @Override
    public void bufferAvailable(Buffer buffer) throws Exception {
        this.bufferBroker.offer(buffer);
        this.channelHandlerContext.channel().eventLoop().execute((Runnable)this.bufferAvailabilityChangedTask);
    }

    private DecoderState decodeEnvelope(ByteBuf in) throws IOException {
        if (this.currentEnvelope == null) {
            this.copy(in, this.headerBuffer);
            if (this.headerBuffer.hasRemaining()) {
                return DecoderState.PENDING;
            }
            this.headerBuffer.flip();
            int magicNum = this.headerBuffer.getInt();
            if (magicNum != -1159983106) {
                throw new IOException("Network stream corrupted: invalid magicnumber in current envelope header.");
            }
            int seqNum = this.headerBuffer.getInt();
            JobID jobId = JobID.fromByteBuffer(this.headerBuffer);
            ChannelID sourceId = ChannelID.fromByteBuffer(this.headerBuffer);
            this.currentEnvelope = new Envelope(seqNum, jobId, sourceId);
            int eventsSize = this.headerBuffer.getInt();
            int bufferSize = this.headerBuffer.getInt();
            this.currentEventsBuffer = eventsSize > 0 ? ByteBuffer.allocate(eventsSize) : null;
            this.currentBufferRequestSize = bufferSize > 0 ? bufferSize : 0;
            this.headerBuffer.clear();
        }
        if (this.currentEventsBuffer != null) {
            this.copy(in, this.currentEventsBuffer);
            if (this.currentEventsBuffer.hasRemaining()) {
                return DecoderState.PENDING;
            }
            this.currentEventsBuffer.flip();
            this.currentEnvelope.setEventsSerialized(this.currentEventsBuffer);
            this.currentEventsBuffer = null;
        }
        if (this.currentBufferRequestSize > 0) {
            ChannelID sourceId;
            JobID jobId = this.currentEnvelope.getJobID();
            Buffer buffer = this.requestBufferForTarget(jobId, sourceId = this.currentEnvelope.getSource(), this.currentBufferRequestSize);
            if (buffer == null) {
                return DecoderState.NO_BUFFER_AVAILABLE;
            }
            this.currentEnvelope.setBuffer(buffer);
            this.currentDataBuffer = buffer.getMemorySegment().wrap(0, this.currentBufferRequestSize);
            this.currentBufferRequestSize = 0;
        }
        if (this.currentDataBuffer != null) {
            this.copy(in, this.currentDataBuffer);
            if (this.currentDataBuffer.hasRemaining()) {
                return DecoderState.PENDING;
            }
            this.currentDataBuffer = null;
        }
        return DecoderState.COMPLETE;
    }

    private Buffer requestBufferForTarget(JobID jobId, ChannelID sourceId, int size) throws IOException {
        if (!jobId.equals(this.lastJobId) || !sourceId.equals(this.lastSourceId)) {
            this.lastJobId = jobId;
            this.lastSourceId = sourceId;
            this.currentBufferProvider = this.bufferProviderBroker.getBufferProvider(jobId, sourceId);
        }
        return this.currentBufferProvider.requestBuffer(size);
    }

    private void copy(ByteBuf src, ByteBuffer dst) {
        if (src.isReadable()) {
            if (src.readableBytes() < dst.remaining()) {
                int oldLimit = dst.limit();
                dst.limit(dst.position() + src.readableBytes());
                src.readBytes(dst);
                dst.limit(oldLimit);
            } else {
                src.readBytes(dst);
            }
        }
    }

    private int skipBytes(ByteBuf in, int toSkip) {
        if (toSkip <= in.readableBytes()) {
            in.readBytes(toSkip);
            return 0;
        }
        int remainingToSkip = toSkip - in.readableBytes();
        in.readerIndex(in.readerIndex() + in.readableBytes());
        return remainingToSkip;
    }

    private class BufferAvailabilityChangedTask
    implements Runnable {
        private BufferAvailabilityChangedTask() {
        }

        @Override
        public void run() {
            Buffer availableBuffer = (Buffer)InboundEnvelopeDecoder.this.bufferBroker.poll();
            if (availableBuffer == null) {
                throw new IllegalStateException("The BufferAvailabilityChangedTaskshould only be executed when a Buffer has been offeredto the Buffer broker (after becoming available).");
            }
            availableBuffer.limitSize(InboundEnvelopeDecoder.this.currentBufferRequestSize);
            InboundEnvelopeDecoder.this.currentEnvelope.setBuffer(availableBuffer);
            InboundEnvelopeDecoder.this.currentDataBuffer = availableBuffer.getMemorySegment().wrap(0, InboundEnvelopeDecoder.this.currentBufferRequestSize);
            InboundEnvelopeDecoder.this.currentBufferRequestSize = 0;
            InboundEnvelopeDecoder.this.stagedBuffer.release();
            try {
                if (InboundEnvelopeDecoder.this.decodeBuffer(InboundEnvelopeDecoder.this.stagedBuffer, InboundEnvelopeDecoder.this.channelHandlerContext)) {
                    InboundEnvelopeDecoder.this.stagedBuffer = null;
                    InboundEnvelopeDecoder.this.channelHandlerContext.channel().config().setAutoRead(true);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)String.format("Set channel %s auto read to true.", InboundEnvelopeDecoder.this.channelHandlerContext.channel()));
                    }
                }
            }
            catch (IOException e) {
                availableBuffer.recycleBuffer();
            }
        }
    }

    private static enum DecoderState {
        COMPLETE,
        PENDING,
        NO_BUFFER_AVAILABLE;

    }
}

