/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionRequestClient {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClient.class);
    private final Channel tcpChannel;
    private final NetworkClientHandler clientHandler;
    private final ConnectionID connectionId;
    private final PartitionRequestClientFactory clientFactory;
    private final AtomicDisposableReferenceCounter closeReferenceCounter = new AtomicDisposableReferenceCounter();

    PartitionRequestClient(Channel tcpChannel, NetworkClientHandler clientHandler, ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
        this.tcpChannel = (Channel)Preconditions.checkNotNull((Object)tcpChannel);
        this.clientHandler = (NetworkClientHandler)Preconditions.checkNotNull((Object)clientHandler);
        this.connectionId = (ConnectionID)Preconditions.checkNotNull((Object)connectionId);
        this.clientFactory = (PartitionRequestClientFactory)Preconditions.checkNotNull((Object)clientFactory);
    }

    boolean disposeIfNotUsed() {
        return this.closeReferenceCounter.disposeIfNotUsed();
    }

    boolean incrementReferenceCounter() {
        return this.closeReferenceCounter.increment();
    }

    public ChannelFuture requestSubpartition(ResultPartitionID partitionId, int subpartitionIndex, final RemoteInputChannel inputChannel, int delayMs) throws IOException {
        this.checkNotClosed();
        LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.", new Object[]{subpartitionIndex, partitionId, delayMs});
        this.clientHandler.addInputChannel(inputChannel);
        final NettyMessage.PartitionRequest request = new NettyMessage.PartitionRequest(partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
        final ChannelFutureListener listener = new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    PartitionRequestClient.this.clientHandler.removeInputChannel(inputChannel);
                    inputChannel.onError(new LocalTransportException("Sending the partition request failed.", future.channel().localAddress(), future.cause()));
                }
            }
        };
        if (delayMs == 0) {
            ChannelFuture f = this.tcpChannel.writeAndFlush((Object)request);
            f.addListener((GenericFutureListener)listener);
            return f;
        }
        final ChannelFuture[] f = new ChannelFuture[1];
        this.tcpChannel.eventLoop().schedule(new Runnable(){

            @Override
            public void run() {
                f[0] = PartitionRequestClient.this.tcpChannel.writeAndFlush((Object)request);
                f[0].addListener((GenericFutureListener)listener);
            }
        }, (long)delayMs, TimeUnit.MILLISECONDS);
        return f[0];
    }

    public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final RemoteInputChannel inputChannel) throws IOException {
        this.checkNotClosed();
        this.tcpChannel.writeAndFlush((Object)new NettyMessage.TaskEventRequest(event, partitionId, inputChannel.getInputChannelId())).addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    inputChannel.onError(new LocalTransportException("Sending the task event failed.", future.channel().localAddress(), future.cause()));
                }
            }
        });
    }

    public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
        if (!this.closeReferenceCounter.isDisposed()) {
            this.clientHandler.notifyCreditAvailable(inputChannel);
        }
    }

    public void close(RemoteInputChannel inputChannel) throws IOException {
        this.clientHandler.removeInputChannel(inputChannel);
        if (this.closeReferenceCounter.decrement()) {
            this.tcpChannel.writeAndFlush((Object)new NettyMessage.CloseRequest()).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
            this.clientFactory.destroyPartitionRequestClient(this.connectionId, this);
        } else {
            this.clientHandler.cancelRequestFor(inputChannel.getInputChannelId());
        }
    }

    private void checkNotClosed() throws IOException {
        if (this.closeReferenceCounter.isDisposed()) {
            throw new LocalTransportException("Channel closed.", this.tcpChannel.localAddress());
        }
    }
}

