/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.zookeeper.server.quorum;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import javax.security.sasl.SaslException;
import oadd.org.apache.jute.BinaryInputArchive;
import oadd.org.apache.jute.BinaryOutputArchive;
import oadd.org.apache.zookeeper.KeeperException;
import oadd.org.apache.zookeeper.server.ByteBufferInputStream;
import oadd.org.apache.zookeeper.server.Request;
import oadd.org.apache.zookeeper.server.ZooKeeperThread;
import oadd.org.apache.zookeeper.server.ZooTrace;
import oadd.org.apache.zookeeper.server.quorum.Leader;
import oadd.org.apache.zookeeper.server.quorum.LearnerInfo;
import oadd.org.apache.zookeeper.server.quorum.LearnerSyncRequest;
import oadd.org.apache.zookeeper.server.quorum.QuorumPacket;
import oadd.org.apache.zookeeper.server.quorum.QuorumPeer;
import oadd.org.apache.zookeeper.server.quorum.StateSummary;
import oadd.org.apache.zookeeper.server.util.SerializeUtils;
import oadd.org.apache.zookeeper.server.util.ZxidUtils;
import oadd.org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LearnerHandler
extends ZooKeeperThread {
    private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);
    protected final Socket sock;
    final Leader leader;
    volatile long tickOfNextAckDeadline;
    protected long sid = 0L;
    protected int version = 1;
    final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue();
    private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
    private BinaryInputArchive ia;
    private BinaryOutputArchive oa;
    private final BufferedInputStream bufferedInput;
    private BufferedOutputStream bufferedOutput;
    final QuorumPacket proposalOfDeath = new QuorumPacket();
    private QuorumPeer.LearnerType learnerType = QuorumPeer.LearnerType.PARTICIPANT;

    public Socket getSocket() {
        return this.sock;
    }

    long getSid() {
        return this.sid;
    }

    int getVersion() {
        return this.version;
    }

    LearnerHandler(Socket sock, BufferedInputStream bufferedInput, Leader leader) throws IOException {
        super("LearnerHandler-" + sock.getRemoteSocketAddress());
        this.sock = sock;
        this.leader = leader;
        this.bufferedInput = bufferedInput;
        try {
            leader.self.authServer.authenticate(sock, new DataInputStream(bufferedInput));
        }
        catch (IOException e) {
            LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection", (Object)sock.getRemoteSocketAddress(), (Object)e);
            try {
                sock.close();
            }
            catch (IOException ie) {
                LOG.error("Exception while closing socket", ie);
            }
            throw new SaslException("Authentication failure: " + e.getMessage());
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("LearnerHandler ").append(this.sock);
        sb.append(" tickOfNextAckDeadline:").append(this.tickOfNextAckDeadline());
        sb.append(" synced?:").append(this.synced());
        sb.append(" queuedPacketLength:").append(this.queuedPackets.size());
        return sb.toString();
    }

    public QuorumPeer.LearnerType getLearnerType() {
        return this.learnerType;
    }

    private void sendPackets() throws InterruptedException {
        block10: {
            long traceMask = 16L;
            try {
                while (true) {
                    QuorumPacket p;
                    if ((p = this.queuedPackets.poll()) == null) {
                        this.bufferedOutput.flush();
                        p = this.queuedPackets.take();
                    }
                    if (p != this.proposalOfDeath) {
                        if (p.getType() == 5) {
                            traceMask = 128L;
                        }
                        if (p.getType() == 2) {
                            this.syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
                        }
                        if (LOG.isTraceEnabled()) {
                            ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                        }
                        this.oa.writeRecord(p, "packet");
                        continue;
                    }
                    break;
                }
            }
            catch (IOException e) {
                if (this.sock.isClosed()) break block10;
                LOG.warn("Unexpected exception at " + this, e);
                try {
                    this.sock.close();
                }
                catch (IOException ie) {
                    LOG.warn("Error closing socket for handler " + this, ie);
                }
            }
        }
    }

    public static String packetToString(QuorumPacket p) {
        String type = null;
        String mess = null;
        Object txn = null;
        switch (p.getType()) {
            case 3: {
                type = "ACK";
                break;
            }
            case 4: {
                type = "COMMIT";
                break;
            }
            case 11: {
                type = "FOLLOWERINFO";
                break;
            }
            case 10: {
                type = "NEWLEADER";
                break;
            }
            case 5: {
                type = "PING";
                break;
            }
            case 2: {
                type = "PROPOSAL";
                TxnHeader hdr = new TxnHeader();
                try {
                    SerializeUtils.deserializeTxn(p.getData(), hdr);
                }
                catch (IOException e) {
                    LOG.warn("Unexpected exception", e);
                }
                break;
            }
            case 1: {
                type = "REQUEST";
                break;
            }
            case 6: {
                type = "REVALIDATE";
                ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
                DataInputStream dis = new DataInputStream(bis);
                try {
                    long id = dis.readLong();
                    mess = " sessionid = " + id;
                }
                catch (IOException e) {
                    LOG.warn("Unexpected exception", e);
                }
                break;
            }
            case 12: {
                type = "UPTODATE";
                break;
            }
            default: {
                type = "UNKNOWN" + p.getType();
            }
        }
        String entry = null;
        if (type != null) {
            entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
        }
        return entry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    @Override
    public void run() {
        block58: {
            block56: {
                block60: {
                    block55: {
                        this.leader.addLearnerHandler(this);
                        this.tickOfNextAckDeadline = this.leader.self.tick.get() + this.leader.self.initLimit + this.leader.self.syncLimit;
                        this.ia = BinaryInputArchive.getArchive(this.bufferedInput);
                        this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
                        this.oa = BinaryOutputArchive.getArchive(this.bufferedOutput);
                        qp = new QuorumPacket();
                        this.ia.readRecord(qp, "packet");
                        if (qp.getType() == 11 || qp.getType() == 16) break block55;
                        LearnerHandler.LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
                        LearnerHandler.LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                        this.shutdown();
                        return;
                    }
                    learnerInfoData = qp.getData();
                    if (learnerInfoData != null) {
                        if (learnerInfoData.length == 8) {
                            bbsid = ByteBuffer.wrap(learnerInfoData);
                            this.sid = bbsid.getLong();
                        } else {
                            li = new LearnerInfo();
                            ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
                            this.sid = li.getServerid();
                            this.version = li.getProtocolVersion();
                        }
                    } else {
                        this.sid = this.leader.followerCounter.getAndDecrement();
                    }
                    LearnerHandler.LOG.info("Follower sid: " + this.sid + " : info : " + this.leader.self.quorumPeers.get(this.sid));
                    if (qp.getType() == 16) {
                        this.learnerType = QuorumPeer.LearnerType.OBSERVER;
                    }
                    lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
                    ss = null;
                    zxid = qp.getZxid();
                    newEpoch = this.leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
                    if (this.getVersion() >= 65536) break block60;
                    epoch = ZxidUtils.getEpochFromZxid(zxid);
                    ss = new StateSummary(epoch, zxid);
                    this.leader.waitForEpochAck(this.getSid(), ss);
                    ** GOTO lbl59
                }
                ver = new byte[4];
                ByteBuffer.wrap(ver).putInt(65536);
                newEpochPacket = new QuorumPacket(17, ZxidUtils.makeZxid(newEpoch, 0L), ver, null);
                this.oa.writeRecord(newEpochPacket, "packet");
                this.bufferedOutput.flush();
                ackEpochPacket = new QuorumPacket();
                this.ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() == 18) break block56;
                LearnerHandler.LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
                LearnerHandler.LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                this.shutdown();
                return;
            }
            bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            this.leader.waitForEpochAck(this.getSid(), ss);
lbl59:
            // 2 sources

            peerLastZxid = ss.getLastZxid();
            packetToSend = 15;
            zxidToSend = 0L;
            leaderLastZxid = 0L;
            updates = peerLastZxid;
            lock = this.leader.zk.getZKDatabase().getLogLock();
            rl = lock.readLock();
            try {
                rl.lock();
                maxCommittedLog = this.leader.zk.getZKDatabase().getmaxCommittedLog();
                minCommittedLog = this.leader.zk.getZKDatabase().getminCommittedLog();
                LearnerHandler.LOG.info("Synchronizing with Follower sid: " + this.sid + " maxCommittedLog=0x" + Long.toHexString(maxCommittedLog) + " minCommittedLog=0x" + Long.toHexString(minCommittedLog) + " peerLastZxid=0x" + Long.toHexString(peerLastZxid));
                proposals = this.leader.zk.getZKDatabase().getCommittedLog();
                if (peerLastZxid == this.leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    LearnerHandler.LOG.info("leader and follower are in sync, zxid=0x{}", (Object)Long.toHexString(peerLastZxid));
                    packetToSend = 13;
                    zxidToSend = peerLastZxid;
                } else if (proposals.size() != 0) {
                    LearnerHandler.LOG.debug("proposal size is {}", (Object)proposals.size());
                    if (maxCommittedLog >= peerLastZxid && minCommittedLog <= peerLastZxid) {
                        LearnerHandler.LOG.debug("Sending proposals to follower");
                        prevProposalZxid = minCommittedLog;
                        firstPacket = true;
                        packetToSend = 13;
                        zxidToSend = maxCommittedLog;
                        for (Leader.Proposal propose : proposals) {
                            if (propose.packet.getZxid() <= peerLastZxid) {
                                prevProposalZxid = propose.packet.getZxid();
                                continue;
                            }
                            if (firstPacket) {
                                firstPacket = false;
                                if (prevProposalZxid < peerLastZxid) {
                                    packetToSend = 14;
                                    updates = zxidToSend = prevProposalZxid;
                                }
                            }
                            this.queuePacket(propose.packet);
                            qcommit = new QuorumPacket(4, propose.packet.getZxid(), null, null);
                            this.queuePacket(qcommit);
                        }
                    } else if (peerLastZxid > maxCommittedLog) {
                        LearnerHandler.LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}", (Object)Long.toHexString(maxCommittedLog), (Object)Long.toHexString(updates));
                        packetToSend = 14;
                        updates = zxidToSend = maxCommittedLog;
                    } else {
                        LearnerHandler.LOG.warn("Unhandled proposal scenario");
                    }
                } else {
                    LearnerHandler.LOG.debug("proposals is empty");
                }
                LearnerHandler.LOG.info("Sending " + Leader.getPacketType(packetToSend));
                leaderLastZxid = this.leader.startForwarding(this, updates);
            }
            finally {
                rl.unlock();
            }
            newLeaderQP = new QuorumPacket(10, ZxidUtils.makeZxid(newEpoch, 0L), null, null);
            if (this.getVersion() < 65536) {
                this.oa.writeRecord(newLeaderQP, "packet");
            } else {
                this.queuedPackets.add(newLeaderQP);
            }
            this.bufferedOutput.flush();
            if (packetToSend == 15) {
                zxidToSend = this.leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            this.oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            this.bufferedOutput.flush();
            if (packetToSend == 15) {
                LearnerHandler.LOG.info("Sending snapshot last zxid of peer is 0x" + Long.toHexString(peerLastZxid) + "  zxid of leader is 0x" + Long.toHexString(leaderLastZxid) + "sent zxid of db as 0x" + Long.toHexString(zxidToSend));
                this.leader.zk.getZKDatabase().serializeSnapshot(this.oa);
                this.oa.writeString("BenWasHere", "signature");
            }
            this.bufferedOutput.flush();
            new Thread(){

                @Override
                public void run() {
                    Thread.currentThread().setName("Sender-" + LearnerHandler.this.sock.getRemoteSocketAddress());
                    try {
                        LearnerHandler.this.sendPackets();
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption", e);
                    }
                }
            }.start();
            qp = new QuorumPacket();
            this.ia.readRecord(qp, "packet");
            if (qp.getType() == 3) break block58;
            LearnerHandler.LOG.error("Next packet was supposed to be an ACK");
            LearnerHandler.LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
            this.shutdown();
            return;
        }
        try {
            LearnerHandler.LOG.info("Received NEWLEADER-ACK message from " + this.getSid());
            this.leader.waitForNewLeaderAck(this.getSid(), qp.getZxid(), this.getLearnerType());
            this.syncLimitCheck.start();
            this.sock.setSoTimeout(this.leader.self.tickTime * this.leader.self.syncLimit);
            var22_35 = this.leader.zk;
            synchronized (var22_35) {
                while (!this.leader.zk.isRunning() && !this.isInterrupted()) {
                    this.leader.zk.wait(20L);
                }
            }
            this.queuedPackets.add(new QuorumPacket(12, -1L, null, null));
            block27: while (true) {
                qp = new QuorumPacket();
                this.ia.readRecord(qp, "packet");
                traceMask = 16L;
                if (qp.getType() == 5) {
                    traceMask = 128L;
                }
                if (LearnerHandler.LOG.isTraceEnabled()) {
                    ZooTrace.logQuorumPacket(LearnerHandler.LOG, traceMask, 'i', qp);
                }
                this.tickOfNextAckDeadline = this.leader.self.tick.get() + this.leader.self.syncLimit;
                switch (qp.getType()) {
                    case 3: {
                        if (this.learnerType == QuorumPeer.LearnerType.OBSERVER && LearnerHandler.LOG.isDebugEnabled()) {
                            LearnerHandler.LOG.debug("Received ACK from Observer  " + this.sid);
                        }
                        this.syncLimitCheck.updateAck(qp.getZxid());
                        this.leader.processAck(this.sid, qp.getZxid(), this.sock.getLocalSocketAddress());
                        continue block27;
                    }
                    case 5: {
                        bis = new ByteArrayInputStream(qp.getData());
                        dis = new DataInputStream(bis);
                        while (true) {
                            if (dis.available() <= 0) continue block27;
                            sess = dis.readLong();
                            to = dis.readInt();
                            this.leader.zk.touch(sess, to);
                        }
                    }
                    case 6: {
                        bis = new ByteArrayInputStream(qp.getData());
                        dis = new DataInputStream(bis);
                        id = dis.readLong();
                        to = dis.readInt();
                        bos = new ByteArrayOutputStream();
                        dos = new DataOutputStream(bos);
                        dos.writeLong(id);
                        valid = this.leader.zk.touch(id, to);
                        if (valid) {
                            try {
                                this.leader.zk.setOwner(id, this);
                            }
                            catch (KeeperException.SessionExpiredException e) {
                                LearnerHandler.LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
                            }
                        }
                        if (LearnerHandler.LOG.isTraceEnabled()) {
                            ZooTrace.logTraceMessage(LearnerHandler.LOG, 32L, "Session 0x" + Long.toHexString(id) + " is valid: " + valid);
                        }
                        dos.writeBoolean(valid);
                        qp.setData(bos.toByteArray());
                        this.queuedPackets.add(qp);
                        continue block27;
                    }
                    case 1: {
                        bb = ByteBuffer.wrap(qp.getData());
                        sessionId = bb.getLong();
                        cxid = bb.getInt();
                        type = bb.getInt();
                        bb = bb.slice();
                        si = type == 9 ? new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()) : new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                        si.setOwner(this);
                        this.leader.zk.submitRequest(si);
                        continue block27;
                    }
                }
                LearnerHandler.LOG.warn("unexpected quorum packet, type: {}", (Object)LearnerHandler.packetToString(qp));
            }
        }
        catch (IOException e) {
            if (this.sock != null && !this.sock.isClosed()) {
                LearnerHandler.LOG.error("Unexpected exception causing shutdown while sock still open", e);
                try {
                    this.sock.close();
                }
                catch (IOException var2_5) {
                    // empty catch block
                }
            }
            LearnerHandler.LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
            this.shutdown();
        }
        catch (InterruptedException e) {
            try {
                LearnerHandler.LOG.error("Unexpected exception causing shutdown", e);
                LearnerHandler.LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                this.shutdown();
            }
            catch (Throwable var38_46) {
                LearnerHandler.LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                this.shutdown();
                throw var38_46;
            }
        }
    }

    public void shutdown() {
        try {
            this.queuedPackets.put(this.proposalOfDeath);
        }
        catch (InterruptedException e) {
            LOG.warn("Ignoring unexpected exception", e);
        }
        try {
            if (this.sock != null && !this.sock.isClosed()) {
                this.sock.close();
            }
        }
        catch (IOException e) {
            LOG.warn("Ignoring unexpected exception during socket close", e);
        }
        this.interrupt();
        this.leader.removeLearnerHandler(this);
    }

    public long tickOfNextAckDeadline() {
        return this.tickOfNextAckDeadline;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void ping() {
        if (this.syncLimitCheck.check(System.nanoTime())) {
            long id;
            Leader leader = this.leader;
            synchronized (leader) {
                id = this.leader.lastProposed;
            }
            QuorumPacket ping = new QuorumPacket(5, id, null, null);
            this.queuePacket(ping);
        } else {
            LOG.warn("Closing connection to peer due to transaction timeout.");
            this.shutdown();
        }
    }

    void queuePacket(QuorumPacket p) {
        this.queuedPackets.add(p);
    }

    public boolean synced() {
        return this.isAlive() && (long)this.leader.self.tick.get() <= this.tickOfNextAckDeadline;
    }

    private class SyncLimitCheck {
        private boolean started = false;
        private long currentZxid = 0L;
        private long currentTime = 0L;
        private long nextZxid = 0L;
        private long nextTime = 0L;

        private SyncLimitCheck() {
        }

        public synchronized void start() {
            this.started = true;
        }

        public synchronized void updateProposal(long zxid, long time) {
            if (!this.started) {
                return;
            }
            if (this.currentTime == 0L) {
                this.currentTime = time;
                this.currentZxid = zxid;
            } else {
                this.nextTime = time;
                this.nextZxid = zxid;
            }
        }

        public synchronized void updateAck(long zxid) {
            if (this.currentZxid == zxid) {
                this.currentTime = this.nextTime;
                this.currentZxid = this.nextZxid;
                this.nextTime = 0L;
                this.nextZxid = 0L;
            } else if (this.nextZxid == zxid) {
                LOG.warn("ACK for " + zxid + " received before ACK for " + this.currentZxid + "!!!!");
                this.nextTime = 0L;
                this.nextZxid = 0L;
            }
        }

        public synchronized boolean check(long time) {
            if (this.currentTime == 0L) {
                return true;
            }
            long msDelay = (time - this.currentTime) / 1000000L;
            return msDelay < (long)(LearnerHandler.this.leader.self.tickTime * LearnerHandler.this.leader.self.syncLimit);
        }
    }
}

