/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import org.apache.nifi.cluster.NodeInformant;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.remote.AbstractCommunicationsSession;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.RemoteSiteListener;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketRemoteSiteListener
implements RemoteSiteListener {
    public static final String DEFAULT_FLOWFILE_PATH = "./";
    private final int socketPort;
    private final SSLContext sslContext;
    private final NodeInformant nodeInformant;
    private final AtomicReference<ProcessGroup> rootGroup = new AtomicReference();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private static final Logger LOG = LoggerFactory.getLogger(SocketRemoteSiteListener.class);

    public SocketRemoteSiteListener(int socketPort, SSLContext sslContext) {
        this(socketPort, sslContext, null);
    }

    public SocketRemoteSiteListener(int socketPort, SSLContext sslContext, NodeInformant nodeInformant) {
        this.socketPort = socketPort;
        this.sslContext = sslContext;
        this.nodeInformant = nodeInformant;
    }

    @Override
    public void setRootGroup(ProcessGroup rootGroup) {
        this.rootGroup.set(rootGroup);
    }

    @Override
    public void start() throws IOException {
        final boolean secure = this.sslContext != null;
        final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(true);
        serverSocketChannel.bind(new InetSocketAddress(this.socketPort));
        this.stopped.set(false);
        Thread listenerThread = new Thread(new Runnable(){
            private int threadCount = 0;

            @Override
            public void run() {
                while (!SocketRemoteSiteListener.this.stopped.get()) {
                    ProcessGroup processGroup = (ProcessGroup)SocketRemoteSiteListener.this.rootGroup.get();
                    if (SocketRemoteSiteListener.this.nodeInformant == null && (processGroup == null || processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty())) {
                        try {
                            Thread.sleep(2000L);
                        }
                        catch (Exception e) {}
                        continue;
                    }
                    LOG.trace("Accepting Connection...");
                    Socket acceptedSocket = null;
                    try {
                        serverSocketChannel.configureBlocking(false);
                        ServerSocket serverSocket = serverSocketChannel.socket();
                        serverSocket.setSoTimeout(2000);
                        while (!SocketRemoteSiteListener.this.stopped.get() && acceptedSocket == null) {
                            try {
                                acceptedSocket = serverSocket.accept();
                            }
                            catch (SocketTimeoutException ste) {}
                        }
                    }
                    catch (IOException e) {
                        LOG.error("RemoteSiteListener Unable to accept connection due to {}", (Object)e.toString());
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.error("", (Throwable)e);
                        continue;
                    }
                    LOG.trace("Got connection");
                    final Socket socket = acceptedSocket;
                    final SocketChannel socketChannel = socket.getChannel();
                    Thread thread = new Thread(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         * Loose catch block
                         * Enabled aggressive block sorting
                         * Enabled unnecessary exception pruning
                         * Enabled aggressive exception aggregation
                         */
                        @Override
                        public void run() {
                            Peer peer;
                            ServerProtocol protocol;
                            String peerUri;
                            block68: {
                                OutputStream socketOut;
                                InputStream socketIn;
                                String dn;
                                AbstractCommunicationsSession commsSession;
                                String hostname = socket.getInetAddress().getHostName();
                                int slashIndex = hostname.indexOf("/");
                                if (slashIndex == 0) {
                                    hostname = hostname.substring(1);
                                } else if (slashIndex > 0) {
                                    hostname = hostname.substring(0, slashIndex);
                                }
                                int port = socket.getPort();
                                peerUri = "nifi://" + hostname + ":" + port;
                                try {
                                    if (secure) {
                                        SSLSocketChannel sslSocketChannel = new SSLSocketChannel(SocketRemoteSiteListener.this.sslContext, socketChannel, false);
                                        LOG.trace("Channel is secure; connecting...");
                                        sslSocketChannel.connect();
                                        LOG.trace("Channel connected");
                                        commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri);
                                        dn = sslSocketChannel.getDn();
                                        commsSession.setUserDn(dn);
                                    } else {
                                        commsSession = new SocketChannelCommunicationsSession(socketChannel, peerUri);
                                        dn = null;
                                    }
                                }
                                catch (Exception e) {
                                    LOG.error("RemoteSiteListener Unable to accept connection from {} due to {}", (Object)socket, (Object)e.toString());
                                    if (LOG.isDebugEnabled()) {
                                        LOG.error("", (Throwable)e);
                                    }
                                    try {
                                        socketChannel.close();
                                        return;
                                    }
                                    catch (IOException swallow) {
                                        // empty catch block
                                    }
                                    return;
                                }
                                LOG.info("Received connection from {}, User DN: {}", (Object)socket.getInetAddress(), dn);
                                try {
                                    socketIn = commsSession.getInput().getInputStream();
                                    socketOut = commsSession.getOutput().getOutputStream();
                                }
                                catch (IOException e) {
                                    LOG.error("Connection dropped from {} before any data was transmitted", (Object)peerUri);
                                    try {
                                        commsSession.close();
                                        return;
                                    }
                                    catch (IOException ioe) {
                                        // empty catch block
                                    }
                                    return;
                                }
                                DataInputStream dis = new DataInputStream(socketIn);
                                DataOutputStream dos = new DataOutputStream(socketOut);
                                protocol = null;
                                peer = null;
                                LOG.debug("Verifying magic bytes...");
                                SocketRemoteSiteListener.this.verifyMagicBytes(dis, peerUri);
                                LOG.debug("Receiving Server Protocol Negotiation");
                                protocol = (ServerProtocol)RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
                                protocol.setRootProcessGroup((ProcessGroup)SocketRemoteSiteListener.this.rootGroup.get());
                                protocol.setNodeInformant(SocketRemoteSiteListener.this.nodeInformant);
                                peer = new Peer((CommunicationsSession)commsSession, peerUri);
                                LOG.debug("Handshaking....");
                                protocol.handshake(peer);
                                if (!protocol.isHandshakeSuccessful()) {
                                    LOG.error("Handshake failed with {}; closing connection", (Object)peer);
                                    try {
                                        peer.close();
                                    }
                                    catch (IOException e) {
                                        LOG.warn("Failed to close {} due to {}", (Object)peer, (Object)e);
                                    }
                                    LOG.trace("Cleaning up");
                                    try {
                                        if (protocol != null && peer != null) {
                                            protocol.shutdown(peer);
                                        }
                                    }
                                    catch (Exception protocolException) {
                                        LOG.warn("Failed to shutdown protocol due to {}", (Object)protocolException.toString());
                                    }
                                    try {
                                        if (peer != null) {
                                            peer.close();
                                        }
                                    }
                                    catch (Exception peerException) {
                                        LOG.warn("Failed to close peer due to {}; some resources may not be appropriately cleaned up", (Object)peerException.toString());
                                    }
                                    LOG.trace("Finished cleaning up");
                                    return;
                                }
                                commsSession.setTimeout((int)protocol.getRequestExpiration());
                                LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[]{protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer});
                                try {
                                    block49: while (true) {
                                        if (protocol.isShutdown()) {
                                            LOG.debug("Finished communicating with {} ({})", (Object)peer, protocol);
                                            break;
                                        }
                                        LOG.trace("Getting Protocol Request Type...");
                                        int timeoutCount = 0;
                                        RequestType requestType = null;
                                        while (requestType == null) {
                                            try {
                                                requestType = protocol.getRequestType(peer);
                                            }
                                            catch (SocketTimeoutException e) {
                                                LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[]{this, protocol, peer});
                                                requestType = null;
                                                if (++timeoutCount < 2) continue;
                                                throw e;
                                            }
                                        }
                                        LOG.debug("Request type from {} is {}", protocol, (Object)requestType);
                                        switch (requestType) {
                                            case NEGOTIATE_FLOWFILE_CODEC: {
                                                protocol.negotiateCodec(peer);
                                                break;
                                            }
                                            case RECEIVE_FLOWFILES: {
                                                protocol.getPort().transferFlowFiles(peer, protocol, new HashMap());
                                                break;
                                            }
                                            case SEND_FLOWFILES: {
                                                protocol.getPort().receiveFlowFiles(peer, protocol, new HashMap());
                                                break;
                                            }
                                            case REQUEST_PEER_LIST: {
                                                protocol.sendPeerList(peer);
                                                break;
                                            }
                                            case SHUTDOWN: {
                                                protocol.shutdown(peer);
                                                continue block49;
                                            }
                                        }
                                    }
                                }
                                catch (Exception e) {
                                    LOG.error("Unable to communicate with remote instance {} ({}) due to {}; closing connection", new Object[]{peer, protocol, e.toString()});
                                    if (!LOG.isDebugEnabled()) break block68;
                                    LOG.error("", (Throwable)e);
                                }
                            }
                            LOG.trace("Cleaning up");
                            try {
                                if (protocol != null && peer != null) {
                                    protocol.shutdown(peer);
                                }
                            }
                            catch (Exception protocolException) {
                                LOG.warn("Failed to shutdown protocol due to {}", (Object)protocolException.toString());
                            }
                            try {
                                if (peer != null) {
                                    peer.close();
                                }
                            }
                            catch (Exception peerException) {
                                LOG.warn("Failed to close peer due to {}; some resources may not be appropriately cleaned up", (Object)peerException.toString());
                            }
                            LOG.trace("Finished cleaning up");
                            return;
                            catch (IOException e) {
                                LOG.error("Unable to communicate with remote instance {} due to {}; closing connection", peer, (Object)e.toString());
                                if (!LOG.isDebugEnabled()) return;
                                LOG.error("", (Throwable)e);
                                return;
                                {
                                    catch (Throwable throwable) {
                                        throw throwable;
                                    }
                                }
                                catch (Throwable t) {
                                    block69: {
                                        LOG.error("Handshake failed when communicating with {}; closing connection. Reason for failure: {}", (Object)peerUri, (Object)t.toString());
                                        if (!LOG.isDebugEnabled()) break block69;
                                        LOG.error("", t);
                                    }
                                    LOG.trace("Cleaning up");
                                    try {
                                        if (protocol != null && peer != null) {
                                            protocol.shutdown(peer);
                                        }
                                    }
                                    catch (Exception protocolException) {
                                        LOG.warn("Failed to shutdown protocol due to {}", (Object)protocolException.toString());
                                    }
                                    try {
                                        if (peer != null) {
                                            peer.close();
                                        }
                                    }
                                    catch (Exception peerException) {
                                        LOG.warn("Failed to close peer due to {}; some resources may not be appropriately cleaned up", (Object)peerException.toString());
                                    }
                                    LOG.trace("Finished cleaning up");
                                    return;
                                }
                            }
                            finally {
                                LOG.trace("Cleaning up");
                                try {
                                    if (protocol != null && peer != null) {
                                        protocol.shutdown(peer);
                                    }
                                }
                                catch (Exception protocolException) {
                                    LOG.warn("Failed to shutdown protocol due to {}", (Object)protocolException.toString());
                                }
                                try {
                                    if (peer != null) {
                                        peer.close();
                                    }
                                }
                                catch (Exception peerException) {
                                    LOG.warn("Failed to close peer due to {}; some resources may not be appropriately cleaned up", (Object)peerException.toString());
                                }
                                LOG.trace("Finished cleaning up");
                            }
                        }
                    });
                    thread.setName("Site-to-Site Worker Thread-" + this.threadCount++);
                    thread.start();
                }
            }
        });
        listenerThread.setName("Site-to-Site Listener");
        listenerThread.start();
    }

    @Override
    public int getPort() {
        return this.socketPort;
    }

    @Override
    public void stop() {
        this.stopped.set(true);
    }

    private void verifyMagicBytes(InputStream in, String peerDescription) throws IOException, HandshakeException {
        byte[] receivedMagicBytes = new byte[CommunicationsSession.MAGIC_BYTES.length];
        try {
            for (int i = 0; i < receivedMagicBytes.length; ++i) {
                receivedMagicBytes[i] = (byte)in.read();
            }
        }
        catch (EOFException e) {
            throw new HandshakeException("Handshake failed (not enough bytes) when communicating with " + peerDescription);
        }
        if (!Arrays.equals(CommunicationsSession.MAGIC_BYTES, receivedMagicBytes)) {
            throw new HandshakeException("Handshake with " + peerDescription + " failed because the Magic Header was not present");
        }
    }
}

