/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryOutputArchive;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.quorum.FollowerHandler;
import org.apache.zookeeper.server.quorum.FollowerSyncRequest;
import org.apache.zookeeper.server.quorum.LeaderBean;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.QuorumPeer;

public class Leader {
    private static final Logger LOG = Logger.getLogger(Leader.class);
    private static final boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true");
    LeaderZooKeeperServer zk;
    QuorumPeer self;
    FollowerCnxAcceptor cnxAcceptor;
    public HashSet<FollowerHandler> followers = new HashSet();
    public HashSet<FollowerHandler> forwardingFollowers = new HashSet();
    public HashMap<Long, List<FollowerSyncRequest>> pendingSyncs = new HashMap();
    AtomicLong followerCounter = new AtomicLong(-1L);
    ServerSocket ss;
    static final int DIFF = 13;
    static final int TRUNC = 14;
    static final int SNAP = 15;
    static final int NEWLEADER = 10;
    static final int FOLLOWERINFO = 11;
    static final int UPTODATE = 12;
    static final int REQUEST = 1;
    public static final int PROPOSAL = 2;
    static final int ACK = 3;
    static final int COMMIT = 4;
    static final int PING = 5;
    static final int REVALIDATE = 6;
    static final int SYNC = 7;
    private ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
    ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue();
    Proposal newLeaderProposal = new Proposal();
    boolean isShutdown;
    long lastCommitted = -1L;
    long lastProposed;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addFollowerHandler(FollowerHandler follower) {
        HashSet<FollowerHandler> hashSet = this.followers;
        synchronized (hashSet) {
            this.followers.add(follower);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeFollowerHandler(FollowerHandler follower) {
        HashSet<FollowerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            this.forwardingFollowers.remove(follower);
        }
        hashSet = this.followers;
        synchronized (hashSet) {
            this.followers.remove(follower);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean isFollowerSynced(FollowerHandler follower) {
        HashSet<FollowerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            return this.forwardingFollowers.contains(follower);
        }
    }

    Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        try {
            this.ss = new ServerSocket(self.getQuorumAddress().getPort());
        }
        catch (BindException e) {
            LOG.error((Object)("Couldn't bind to port " + self.getQuorumAddress().getPort()), (Throwable)e);
            throw e;
        }
        this.zk = zk;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void lead() throws IOException, InterruptedException {
        this.zk.registerJMX(new LeaderBean(this, this.zk), this.self.jmxLocalPeerBean);
        try {
            this.self.tick = 0;
            this.zk.loadData();
            this.zk.startup();
            long epoch = this.self.getLastLoggedZxid() >> 32;
            this.zk.setZxid(++epoch << 32);
            this.zk.dataTree.lastProcessedZxid = this.zk.getZxid();
            Leader leader = this;
            synchronized (leader) {
                this.lastProposed = this.zk.getZxid();
            }
            this.newLeaderProposal.packet = new QuorumPacket(10, this.zk.getZxid(), null, null);
            if ((this.newLeaderProposal.packet.getZxid() & 0xFFFFFFFFL) != 0L) {
                LOG.info((Object)("NEWLEADER proposal has Zxid of " + Long.toHexString(this.newLeaderProposal.packet.getZxid())));
            }
            this.outstandingProposals.put(this.newLeaderProposal.packet.getZxid(), this.newLeaderProposal);
            this.cnxAcceptor = new FollowerCnxAcceptor();
            this.cnxAcceptor.start();
            this.newLeaderProposal.ackSet.add(this.self.getId());
            while (!this.self.getQuorumVerifier().containsQuorum(this.newLeaderProposal.ackSet)) {
                if (this.self.tick > this.self.initLimit) {
                    StringBuffer ackToString = new StringBuffer();
                    for (Long id : this.newLeaderProposal.ackSet) {
                        ackToString.append(id + ": ");
                    }
                    this.shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
                    HashSet<Long> followerSet = new HashSet<Long>();
                    for (FollowerHandler f : this.followers) {
                        followerSet.add(f.getSid());
                    }
                    if (this.self.getQuorumVerifier().containsQuorum(followerSet)) {
                        LOG.warn((Object)"Enough followers present. Perhaps the initTicks need to be increased.");
                    }
                    return;
                }
                Thread.sleep(this.self.tickTime);
                ++this.self.tick;
            }
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                this.self.cnxnFactory.setZooKeeperServer(this.zk);
            }
            boolean tickSkip = true;
            while (true) {
                Thread.sleep(this.self.tickTime / 2);
                if (!tickSkip) {
                    ++this.self.tick;
                }
                int syncedCount = 0;
                HashSet<Long> syncedSet = new HashSet<Long>();
                syncedSet.add(this.self.getId());
                HashSet<FollowerHandler> hashSet = this.followers;
                synchronized (hashSet) {
                    for (FollowerHandler f : this.followers) {
                        if (f.synced()) {
                            ++syncedCount;
                            syncedSet.add(f.getSid());
                        }
                        f.ping();
                    }
                }
                if (!tickSkip && !this.self.getQuorumVerifier().containsQuorum(syncedSet)) {
                    this.shutdown("Only " + syncedCount + " followers, need " + this.self.quorumPeers.size() / 2);
                    return;
                }
                tickSkip = !tickSkip;
            }
        }
        finally {
            this.zk.unregisterJMX(this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void shutdown(String reason) {
        if (this.isShutdown) {
            return;
        }
        LOG.info((Object)"Shutdown called", (Throwable)new Exception("shutdown Leader! reason: " + reason));
        if (this.cnxAcceptor != null) {
            this.cnxAcceptor.halt();
        }
        this.self.cnxnFactory.setZooKeeperServer(null);
        this.self.cnxnFactory.clear();
        if (this.zk != null) {
            this.zk.shutdown();
        }
        try {
            this.ss.close();
        }
        catch (IOException e) {
            LOG.warn((Object)"Ignoring unexpected exception during close", (Throwable)e);
        }
        HashSet<FollowerHandler> hashSet = this.followers;
        synchronized (hashSet) {
            Iterator<FollowerHandler> it = this.followers.iterator();
            while (it.hasNext()) {
                FollowerHandler f = it.next();
                it.remove();
                f.shutdown();
            }
        }
        this.isShutdown = true;
    }

    public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
        boolean first = true;
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Ack zxid: 0x" + Long.toHexString(zxid)));
            for (Proposal p : this.outstandingProposals.values()) {
                long packetZxid = p.packet.getZxid();
                LOG.trace((Object)("outstanding proposal: 0x" + Long.toHexString(packetZxid)));
            }
            LOG.trace((Object)"outstanding proposals all");
        }
        if (this.outstandingProposals.size() == 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"outstanding is 0");
            }
            return;
        }
        if (this.lastCommitted >= zxid) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("proposal has already been committed, pzxid:" + this.lastCommitted + " zxid: 0x" + Long.toHexString(zxid)));
            }
            return;
        }
        Proposal p = (Proposal)this.outstandingProposals.get(zxid);
        if (p == null) {
            LOG.warn((Object)("Trying to commit future proposal: zxid 0x" + Long.toHexString(zxid) + " from " + followerAddr));
            return;
        }
        p.ackSet.add(sid);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Count for zxid: 0x" + Long.toHexString(zxid) + " is " + p.ackSet.size()));
        }
        if (this.self.getQuorumVerifier().containsQuorum(p.ackSet)) {
            if (zxid != this.lastCommitted + 1L) {
                LOG.warn((Object)("Commiting zxid 0x" + Long.toHexString(zxid) + " from " + followerAddr + " not first!"));
                LOG.warn((Object)("First is " + (this.lastCommitted + 1L)));
            }
            this.outstandingProposals.remove(zxid);
            if (p.request != null) {
                this.toBeApplied.add(p);
            }
            if ((zxid & 0xFFFFFFFFL) != 0L) {
                if (p.request == null) {
                    LOG.warn((Object)("Going to commmit null: " + p));
                }
                this.commit(zxid);
                this.zk.commitProcessor.commit(p.request);
                if (this.pendingSyncs.containsKey(zxid)) {
                    for (FollowerSyncRequest r : this.pendingSyncs.remove(zxid)) {
                        this.sendSync(r);
                    }
                }
                return;
            }
            this.lastCommitted = zxid;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendPacket(QuorumPacket qp) {
        HashSet<FollowerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            for (FollowerHandler f : this.forwardingFollowers) {
                f.queuePacket(qp);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commit(long zxid) {
        Leader leader = this;
        synchronized (leader) {
            this.lastCommitted = zxid;
        }
        QuorumPacket qp = new QuorumPacket(4, zxid, null, null);
        this.sendPacket(qp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Proposal propose(Request request) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
        try {
            request.hdr.serialize(boa, "hdr");
            if (request.txn != null) {
                request.txn.serialize(boa, "txn");
            }
            baos.close();
        }
        catch (IOException e) {
            LOG.warn((Object)"This really should be impossible", (Throwable)e);
        }
        QuorumPacket pp = new QuorumPacket(2, request.zxid, baos.toByteArray(), null);
        Proposal p = new Proposal();
        p.packet = pp;
        p.request = request;
        Leader leader = this;
        synchronized (leader) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Proposing:: " + request));
            }
            this.lastProposed = p.packet.getZxid();
            this.outstandingProposals.put(this.lastProposed, p);
            this.sendPacket(pp);
        }
        return p;
    }

    public synchronized void processSync(FollowerSyncRequest r) {
        if (this.outstandingProposals.isEmpty()) {
            this.sendSync(r);
        } else {
            List<FollowerSyncRequest> l = this.pendingSyncs.get(this.lastProposed);
            if (l == null) {
                l = new ArrayList<FollowerSyncRequest>();
            }
            l.add(r);
            this.pendingSyncs.put(this.lastProposed, l);
        }
    }

    public void sendSync(FollowerSyncRequest r) {
        QuorumPacket qp = new QuorumPacket(7, 0L, null, null);
        r.fh.queuePacket(qp);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized long startForwarding(FollowerHandler handler, long lastSeenZxid) {
        if (this.lastProposed > lastSeenZxid) {
            for (Proposal p : this.toBeApplied) {
                if (p.packet.getZxid() <= lastSeenZxid) continue;
                handler.queuePacket(p.packet);
                QuorumPacket qp = new QuorumPacket(4, p.packet.getZxid(), null, null);
                handler.queuePacket(qp);
            }
            ArrayList zxids = new ArrayList(this.outstandingProposals.keySet());
            Collections.sort(zxids);
            for (Long zxid : zxids) {
                if (zxid <= lastSeenZxid) continue;
                handler.queuePacket(((Proposal)this.outstandingProposals.get((Object)zxid)).packet);
            }
        }
        HashSet<FollowerHandler> hashSet = this.forwardingFollowers;
        synchronized (hashSet) {
            this.forwardingFollowers.add(handler);
        }
        return this.lastProposed;
    }

    static {
        LOG.info((Object)("TCP NoDelay set to: " + nodelay));
    }

    static class ToBeAppliedRequestProcessor
    implements RequestProcessor {
        private RequestProcessor next;
        private ConcurrentLinkedQueue<Proposal> toBeApplied;

        ToBeAppliedRequestProcessor(RequestProcessor next, ConcurrentLinkedQueue<Proposal> toBeApplied) {
            if (!(next instanceof FinalRequestProcessor)) {
                throw new RuntimeException(ToBeAppliedRequestProcessor.class.getName() + " must be connected to " + FinalRequestProcessor.class.getName() + " not " + next.getClass().getName());
            }
            this.toBeApplied = toBeApplied;
            this.next = next;
        }

        @Override
        public void processRequest(Request request) {
            this.next.processRequest(request);
            Proposal p = this.toBeApplied.peek();
            if (p != null && p.request != null && p.request.zxid == request.zxid) {
                this.toBeApplied.remove();
            }
        }

        @Override
        public void shutdown() {
            this.next.shutdown();
        }
    }

    class FollowerCnxAcceptor
    extends Thread {
        private volatile boolean stop = false;

        FollowerCnxAcceptor() {
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                while (!this.stop) {
                    try {
                        Socket s = Leader.this.ss.accept();
                        s.setSoTimeout(Leader.this.self.tickTime * Leader.this.self.syncLimit);
                        s.setTcpNoDelay(nodelay);
                        FollowerHandler fh = new FollowerHandler(s, Leader.this);
                        fh.start();
                    }
                    catch (SocketException e) {
                        if (!this.stop) throw e;
                        LOG.info((Object)("exception while shutting down acceptor: " + e));
                        this.stop = true;
                        continue;
                        return;
                    }
                }
            }
            catch (Exception e) {
                LOG.warn((Object)"Exception while accepting follower", (Throwable)e);
            }
        }

        public void halt() {
            this.stop = true;
        }
    }

    public static class Proposal {
        public QuorumPacket packet;
        public HashSet<Long> ackSet = new HashSet();
        public Request request;

        public String toString() {
            return this.packet.getType() + ", " + this.packet.getZxid() + ", " + this.request;
        }
    }
}

