/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeer;

public class QuorumCnxManager {
    private static final Logger LOG = Logger.getLogger(QuorumCnxManager.class);
    static final int CAPACITY = 100;
    static final int MAX_CONNECTION_ATTEMPTS = 2;
    int packetSize;
    int port = this.port;
    long challenge;
    QuorumPeer self;
    ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
    ArrayBlockingQueue<Message> recvQueue = new ArrayBlockingQueue(100);
    boolean shutdown = false;
    Listener listener;

    public QuorumCnxManager(QuorumPeer self) {
        this.queueSendMap = new ConcurrentHashMap();
        this.senderWorkerMap = new ConcurrentHashMap();
        this.self = self;
        this.genChallenge();
        this.listener = new Listener();
        this.listener.start();
    }

    void genChallenge() {
        try {
            long newValue;
            Random rand = new Random(System.currentTimeMillis() + (long)InetAddress.getLocalHost().hashCode());
            this.challenge = newValue = rand.nextLong();
        }
        catch (UnknownHostException e) {
            LOG.error((Object)"Cannot resolve local address");
            this.challenge = 0L;
        }
    }

    boolean initiateConnection(SocketChannel s, Long sid) {
        boolean challenged = true;
        boolean wins = false;
        try {
            byte[] msgBytes = new byte[8];
            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
            msgBuffer.putLong(this.self.getId());
            msgBuffer.position(0);
            s.write(msgBuffer);
        }
        catch (IOException e) {
            LOG.warn((Object)("Exception reading or writing challenge: " + e.toString()));
            return false;
        }
        if (sid > this.self.getId()) {
            try {
                LOG.warn((Object)("Have smaller server identifier, so dropping the connection: (" + sid + ", " + this.self.getId()));
                s.socket().close();
            }
            catch (IOException e) {
                LOG.warn((Object)("Error when closing socket or trying to reopen connection: " + e.toString()));
            }
        } else {
            if (s != null) {
                SendWorker sw = new SendWorker(s, sid);
                RecvWorker rw = new RecvWorker(s, sid);
                sw.setRecv(rw);
                if (this.senderWorkerMap.containsKey(sid)) {
                    this.senderWorkerMap.get(sid).finish();
                }
                if (!this.queueSendMap.containsKey(sid)) {
                    this.queueSendMap.put(sid, new ArrayBlockingQueue(100));
                }
                this.senderWorkerMap.put(sid, sw);
                sw.start();
                rw.start();
                return true;
            }
            LOG.warn((Object)"Channel null");
            return false;
        }
        return false;
    }

    boolean receiveConnection(SocketChannel s) {
        SendWorker sw;
        boolean challenged = true;
        boolean wins = false;
        Long sid = null;
        try {
            byte[] msgBytes = new byte[8];
            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
            s.read(msgBuffer);
            msgBuffer.position(0);
            sid = msgBuffer.getLong();
        }
        catch (IOException e) {
            LOG.warn((Object)("Exception reading or writing challenge: " + e.toString()));
            return false;
        }
        if (sid < this.self.getId()) {
            try {
                SocketChannel channel;
                sw = this.senderWorkerMap.get(sid);
                LOG.warn((Object)"Create new connection");
                s.socket().close();
                if (sw != null) {
                    sw.finish();
                }
                if ((channel = SocketChannel.open(this.self.quorumPeers.get((Object)sid).electionAddr)).isConnected()) {
                    this.initiateConnection(channel, sid);
                }
            }
            catch (IOException e) {
                LOG.warn((Object)("Error when closing socket or trying to reopen connection: " + e.toString()));
            }
        } else {
            if (s != null) {
                sw = new SendWorker(s, sid);
                RecvWorker rw = new RecvWorker(s, sid);
                sw.setRecv(rw);
                if (this.senderWorkerMap.containsKey(sid)) {
                    this.senderWorkerMap.get(sid).finish();
                }
                this.senderWorkerMap.put(sid, sw);
                if (!this.queueSendMap.containsKey(sid)) {
                    this.queueSendMap.put(sid, new ArrayBlockingQueue(100));
                }
                sw.start();
                rw.start();
                return true;
            }
            LOG.warn((Object)"Channel null");
            return false;
        }
        return false;
    }

    void toSend(Long sid, ByteBuffer b) {
        if (this.self.getId() == sid.longValue()) {
            try {
                b.position(0);
                this.recvQueue.put(new Message(b.duplicate(), sid));
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Exception when loopbacking");
            }
        } else {
            try {
                if (!this.queueSendMap.containsKey(sid)) {
                    this.queueSendMap.put(sid, new ArrayBlockingQueue(100));
                    this.queueSendMap.get(sid).put(b);
                } else {
                    if (this.queueSendMap.get(sid).remainingCapacity() == 0) {
                        this.queueSendMap.get(sid).take();
                    }
                    this.queueSendMap.get(sid).put(b);
                }
                if (this.senderWorkerMap.get(sid) == null) {
                    try {
                        SocketChannel channel = SocketChannel.open(this.self.quorumPeers.get((Object)sid).electionAddr);
                        channel.socket().setTcpNoDelay(true);
                        this.initiateConnection(channel, sid);
                    }
                    catch (IOException e) {
                        LOG.warn((Object)("Cannot open channel to " + sid + "( " + e.toString() + ")"));
                    }
                }
            }
            catch (InterruptedException e) {
                LOG.warn((Object)("Interrupted while waiting to put message in queue." + e.toString()));
            }
        }
    }

    boolean haveDelivered() {
        for (ArrayBlockingQueue<ByteBuffer> queue : this.queueSendMap.values()) {
            if (queue.size() != 0) continue;
            return true;
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.warn((Object)"Halting listener");
        this.listener.halt();
        for (SendWorker sw : this.senderWorkerMap.values()) {
            LOG.warn((Object)("Halting sender: " + sw));
            sw.finish();
        }
    }

    class RecvWorker
    extends Thread {
        Long sid;
        SocketChannel channel;
        boolean running = true;

        RecvWorker(SocketChannel channel, Long sid) {
            this.sid = sid;
            this.channel = channel;
        }

        boolean finish() {
            this.running = false;
            this.interrupt();
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                byte[] size = new byte[4];
                ByteBuffer msgLength = ByteBuffer.wrap(size);
                while (this.running && !QuorumCnxManager.this.shutdown && this.channel.isConnected()) {
                    while (msgLength.hasRemaining()) {
                        this.channel.read(msgLength);
                    }
                    msgLength.position(0);
                    int length = msgLength.getInt();
                    if (length <= 0) continue;
                    byte[] msgArray = new byte[length];
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    int numbytes = 0;
                    while (message.hasRemaining()) {
                        numbytes += this.channel.read(message);
                    }
                    message.position(0);
                    ArrayBlockingQueue<Message> arrayBlockingQueue = QuorumCnxManager.this.recvQueue;
                    synchronized (arrayBlockingQueue) {
                        QuorumCnxManager.this.recvQueue.put(new Message(message.duplicate(), this.sid));
                    }
                    msgLength.position(0);
                }
            }
            catch (IOException e) {
                LOG.warn((Object)("Connection broken: " + e.toString()));
            }
            catch (InterruptedException e) {
                LOG.warn((Object)("Interrupted while trying to add new message to the reception queue (" + e.toString() + ")"));
            }
        }
    }

    class SendWorker
    extends Thread {
        Long sid;
        SocketChannel channel;
        RecvWorker recvWorker;
        boolean running = true;

        SendWorker(SocketChannel channel, Long sid) {
            this.sid = sid;
            this.channel = channel;
            this.recvWorker = null;
            LOG.debug((Object)("Address of remote peer: " + this.sid));
        }

        void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        boolean finish() {
            this.running = false;
            this.interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid);
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            while (this.running && !QuorumCnxManager.this.shutdown) {
                ByteBuffer b = null;
                try {
                    b = QuorumCnxManager.this.queueSendMap.get(this.sid).take();
                }
                catch (InterruptedException e) {
                    LOG.warn((Object)("Interrupted while waiting for message on queue (" + e.toString() + ")"));
                    continue;
                }
                try {
                    byte[] msgBytes = new byte[b.capacity() + 4];
                    ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
                    msgBuffer.putInt(b.capacity());
                    msgBuffer.put(b.array(), 0, b.capacity());
                    msgBuffer.position(0);
                    this.channel.write(msgBuffer);
                }
                catch (IOException e) {
                    LOG.warn((Object)("Exception when using channel: " + this.sid + ")" + e.toString()));
                    this.running = false;
                    ConcurrentHashMap<Long, SendWorker> concurrentHashMap = QuorumCnxManager.this.senderWorkerMap;
                    synchronized (concurrentHashMap) {
                        this.recvWorker.finish();
                        this.recvWorker = null;
                        QuorumCnxManager.this.senderWorkerMap.remove(this.sid);
                        if (QuorumCnxManager.this.queueSendMap.get(this.sid).size() == 0) {
                            QuorumCnxManager.this.queueSendMap.get(this.sid).offer(b);
                        }
                    }
                }
            }
            LOG.warn((Object)"Leaving thread");
        }
    }

    class Listener
    extends Thread {
        ServerSocketChannel ss = null;

        Listener() {
        }

        public void run() {
            ServerSocketChannel ss = null;
            try {
                ss = ServerSocketChannel.open();
                int port = QuorumCnxManager.this.self.quorumPeers.get((Object)Long.valueOf((long)QuorumCnxManager.this.self.getId())).electionAddr.getPort();
                LOG.warn((Object)("My election bind port: " + port));
                ss.socket().bind(new InetSocketAddress(port));
                while (!QuorumCnxManager.this.shutdown) {
                    SocketChannel client = ss.accept();
                    client.socket().setTcpNoDelay(true);
                    LOG.warn((Object)"Connection request");
                    QuorumCnxManager.this.receiveConnection(client);
                }
            }
            catch (IOException e) {
                System.err.println("Listener.run: " + e.getMessage());
            }
        }

        void halt() {
            try {
                if (this.ss != null) {
                    this.ss.close();
                }
            }
            catch (IOException e) {
                LOG.warn((Object)("Exception when shutting down listener: " + e));
            }
        }
    }

    static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }
    }
}

