/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.event.transport.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Future;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;

class NettyEventSender<T>
implements EventSender<T> {
    private final EventLoopGroup group;
    private final ChannelPool channelPool;
    private final SocketAddress remoteAddress;
    private final boolean singleEventPerConnection;
    private final Duration shutdownQuietPeriod;
    private final Duration shutdownTimeout;

    NettyEventSender(EventLoopGroup group, ChannelPool channelPool, SocketAddress remoteAddress, boolean singleEventPerConnection) {
        this(group, channelPool, remoteAddress, singleEventPerConnection, ShutdownQuietPeriod.DEFAULT.getDuration(), ShutdownTimeout.DEFAULT.getDuration());
    }

    NettyEventSender(EventLoopGroup group, ChannelPool channelPool, SocketAddress remoteAddress, boolean singleEventPerConnection, Duration shutdownQuietPeriod, Duration shutdownTimeout) {
        this.group = group;
        this.channelPool = channelPool;
        this.remoteAddress = remoteAddress;
        this.singleEventPerConnection = singleEventPerConnection;
        this.shutdownQuietPeriod = shutdownQuietPeriod;
        this.shutdownTimeout = shutdownTimeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendEvent(T event) {
        try {
            Future futureChannel = this.channelPool.acquire().sync();
            Channel channel = (Channel)futureChannel.get();
            try {
                ChannelFuture channelFuture = channel.writeAndFlush(event);
                channelFuture.syncUninterruptibly();
            }
            finally {
                this.releaseChannel(channel);
            }
        }
        catch (Exception e) {
            throw new EventException(this.getChannelMessage("Send Failed"), e);
        }
    }

    @Override
    public void close() {
        try {
            this.channelPool.close();
        }
        finally {
            this.group.shutdownGracefully(this.shutdownQuietPeriod.toMillis(), this.shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS).syncUninterruptibly();
        }
    }

    public String toString() {
        return this.getChannelMessage("Event Sender");
    }

    private String getChannelMessage(String message) {
        return String.format("%s Remote Address [%s]", message, this.remoteAddress);
    }

    private void releaseChannel(Channel channel) {
        if (this.singleEventPerConnection) {
            channel.close();
        }
        this.channelPool.release(channel);
    }
}

