/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CreditBasedPartitionRequestClientHandler
extends ChannelInboundHandlerAdapter
implements NetworkClientHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CreditBasedPartitionRequestClientHandler.class);
    private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
    private final ArrayDeque<RemoteInputChannel> inputChannelsWithCredit = new ArrayDeque();
    private final AtomicReference<Throwable> channelError = new AtomicReference();
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = new ConcurrentHashMap<InputChannelID, InputChannelID>();
    private volatile ChannelHandlerContext ctx;

    CreditBasedPartitionRequestClientHandler() {
    }

    @Override
    public void addInputChannel(RemoteInputChannel listener) throws IOException {
        this.checkError();
        this.inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
    }

    @Override
    public void removeInputChannel(RemoteInputChannel listener) {
        this.inputChannels.remove((Object)listener.getInputChannelId());
    }

    @Override
    public void cancelRequestFor(InputChannelID inputChannelId) {
        if (inputChannelId == null || this.ctx == null) {
            return;
        }
        if (this.cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) {
            this.ctx.writeAndFlush((Object)new NettyMessage.CancelPartitionRequest(inputChannelId));
        }
    }

    @Override
    public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
        this.ctx.executor().execute(() -> this.ctx.pipeline().fireUserEventTriggered((Object)inputChannel));
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelActive(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (!this.inputChannels.isEmpty()) {
            SocketAddress remoteAddr = ctx.channel().remoteAddress();
            this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. This might indicate that the remote task manager was lost.", remoteAddr));
        }
        super.channelInactive(ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof TransportException) {
            this.notifyAllChannelsOfErrorAndClose(cause);
        } else {
            TransportException tex;
            SocketAddress remoteAddr = ctx.channel().remoteAddress();
            if (cause instanceof IOException && cause.getMessage().equals("Connection reset by peer")) {
                tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. This indicates that the remote task manager was lost.", remoteAddr, cause);
            } else {
                SocketAddress localAddr = ctx.channel().localAddress();
                tex = new LocalTransportException(String.format("%s (connection to '%s')", cause.getMessage(), remoteAddr), localAddr, cause);
            }
            this.notifyAllChannelsOfErrorAndClose(tex);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            this.decodeMsg(msg);
        }
        catch (Throwable t) {
            this.notifyAllChannelsOfErrorAndClose(t);
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RemoteInputChannel) {
            boolean triggerWrite = this.inputChannelsWithCredit.isEmpty();
            this.inputChannelsWithCredit.add((RemoteInputChannel)msg);
            if (triggerWrite) {
                this.writeAndFlushNextMessageIfPossible(ctx.channel());
            }
        } else {
            ctx.fireUserEventTriggered(msg);
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeAndFlushNextMessageIfPossible(ctx.channel());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
        if (this.channelError.compareAndSet(null, cause)) {
            try {
                for (RemoteInputChannel inputChannel : this.inputChannels.values()) {
                    inputChannel.onError(cause);
                }
            }
            catch (Throwable t) {
                LOG.warn("An Exception was thrown during error notification of a remote input channel.", t);
            }
            finally {
                this.inputChannels.clear();
                this.inputChannelsWithCredit.clear();
                if (this.ctx != null) {
                    this.ctx.close();
                }
            }
        }
    }

    private void checkError() throws IOException {
        Throwable t = this.channelError.get();
        if (t != null) {
            if (t instanceof IOException) {
                throw (IOException)t;
            }
            throw new IOException("There has been an error in the channel.", t);
        }
    }

    private void decodeMsg(Object msg) throws Throwable {
        Class<?> msgClazz = msg.getClass();
        if (msgClazz == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse)msg;
            RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get((Object)bufferOrEvent.receiverId);
            if (inputChannel == null) {
                bufferOrEvent.releaseBuffer();
                this.cancelRequestFor(bufferOrEvent.receiverId);
                return;
            }
            this.decodeBufferOrEvent(inputChannel, bufferOrEvent);
        } else if (msgClazz == NettyMessage.ErrorResponse.class) {
            NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse)msg;
            SocketAddress remoteAddr = this.ctx.channel().remoteAddress();
            if (error.isFatalError()) {
                this.notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Fatal error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause));
            } else {
                RemoteInputChannel inputChannel = (RemoteInputChannel)this.inputChannels.get((Object)error.receiverId);
                if (inputChannel != null) {
                    if (error.cause.getClass() == PartitionNotFoundException.class) {
                        inputChannel.onFailedPartitionRequest();
                    } else {
                        inputChannel.onError(new RemoteTransportException("Error at remote task manager '" + remoteAddr + "'.", remoteAddr, error.cause));
                    }
                }
            }
        } else {
            throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
        try {
            ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
            int receivedSize = nettyBuffer.readableBytes();
            if (bufferOrEvent.isBuffer()) {
                if (receivedSize == 0) {
                    inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                    return;
                }
                Buffer buffer = inputChannel.requestBuffer();
                if (buffer != null) {
                    nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
                    inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                    return;
                } else {
                    if (!inputChannel.isReleased()) throw new IllegalStateException("No buffer available in credit-based input channel.");
                    this.cancelRequestFor(bufferOrEvent.receiverId);
                }
                return;
            } else {
                byte[] byteArray = new byte[receivedSize];
                nettyBuffer.readBytes(byteArray);
                MemorySegment memSeg = MemorySegmentFactory.wrap((byte[])byteArray);
                NetworkBuffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
                inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
            }
            return;
        }
        finally {
            bufferOrEvent.releaseBuffer();
        }
    }

    private void writeAndFlushNextMessageIfPossible(Channel channel) {
        RemoteInputChannel inputChannel;
        if (this.channelError.get() != null || !channel.isWritable()) {
            return;
        }
        do {
            if ((inputChannel = this.inputChannelsWithCredit.poll()) != null) continue;
            return;
        } while (inputChannel.isReleased());
        NettyMessage.AddCredit msg = new NettyMessage.AddCredit(inputChannel.getPartitionId(), inputChannel.getAndResetUnannouncedCredit(), inputChannel.getInputChannelId());
        channel.writeAndFlush((Object)msg).addListener((GenericFutureListener)this.writeListener);
    }

    private class WriteAndFlushNextMessageIfPossibleListener
    implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            try {
                if (future.isSuccess()) {
                    CreditBasedPartitionRequestClientHandler.this.writeAndFlushNextMessageIfPossible(future.channel());
                } else if (future.cause() != null) {
                    CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(future.cause());
                } else {
                    CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(new IllegalStateException("Sending cancelled by user."));
                }
            }
            catch (Throwable t) {
                CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(t);
            }
        }
    }
}

