/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.UUID;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperLeaderElectionService
implements LeaderElectionService,
LeaderLatchListener,
NodeCacheListener,
UnhandledErrorListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
    private final Object lock = new Object();
    private final CuratorFramework client;
    private final LeaderLatch leaderLatch;
    private final NodeCache cache;
    private final String leaderPath;
    private volatile UUID issuedLeaderSessionID;
    private volatile UUID confirmedLeaderSessionID;
    private volatile LeaderContender leaderContender;
    private volatile boolean running;
    private final ConnectionStateListener listener = new ConnectionStateListener(){

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            ZooKeeperLeaderElectionService.this.handleStateChange(newState);
        }
    };

    public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
        this.client = (CuratorFramework)Preconditions.checkNotNull((Object)client, (String)"CuratorFramework client");
        this.leaderPath = (String)Preconditions.checkNotNull((Object)leaderPath, (String)"leaderPath");
        this.leaderLatch = new LeaderLatch(client, latchPath);
        this.cache = new NodeCache(client, leaderPath);
        this.issuedLeaderSessionID = null;
        this.confirmedLeaderSessionID = null;
        this.leaderContender = null;
        this.running = false;
    }

    public UUID getLeaderSessionID() {
        return this.confirmedLeaderSessionID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(LeaderContender contender) throws Exception {
        Preconditions.checkNotNull((Object)contender, (String)"Contender must not be null.");
        Preconditions.checkState((this.leaderContender == null ? 1 : 0) != 0, (Object)"Contender was already set.");
        LOG.info("Starting ZooKeeperLeaderElectionService {}.", (Object)this);
        Object object = this.lock;
        synchronized (object) {
            this.client.getUnhandledErrorListenable().addListener(this);
            this.leaderContender = contender;
            this.leaderLatch.addListener(this);
            this.leaderLatch.start();
            this.cache.getListenable().addListener(this);
            this.cache.start();
            this.client.getConnectionStateListenable().addListener(this.listener);
            this.running = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (!this.running) {
                return;
            }
            this.running = false;
            this.confirmedLeaderSessionID = null;
            this.issuedLeaderSessionID = null;
        }
        LOG.info("Stopping ZooKeeperLeaderElectionService {}.", (Object)this);
        this.client.getUnhandledErrorListenable().removeListener(this);
        this.client.getConnectionStateListenable().removeListener(this.listener);
        Exception exception = null;
        try {
            this.cache.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
        }
        try {
            this.leaderLatch.close();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            throw new Exception("Could not properly stop the ZooKeeperLeaderElectionService.", exception);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void confirmLeaderSessionID(UUID leaderSessionID) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Confirm leader session ID {} for leader {}.", (Object)leaderSessionID, (Object)this.leaderContender.getAddress());
        }
        Preconditions.checkNotNull((Object)leaderSessionID);
        if (this.leaderLatch.hasLeadership()) {
            Object object = this.lock;
            synchronized (object) {
                if (this.running) {
                    if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
                        this.confirmedLeaderSessionID = leaderSessionID;
                        this.writeLeaderInformation(this.confirmedLeaderSessionID);
                    }
                } else {
                    LOG.debug("Ignoring the leader session Id {} confirmation, since the ZooKeeperLeaderElectionService has already been stopped.", (Object)leaderSessionID);
                }
            }
        } else {
            LOG.warn("The leader session ID {} was confirmed even though the corresponding JobManager was not elected as the leader.", (Object)leaderSessionID);
        }
    }

    @Override
    public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
        return this.leaderLatch.hasLeadership() && leaderSessionId.equals(this.issuedLeaderSessionID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void isLeader() {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                this.issuedLeaderSessionID = UUID.randomUUID();
                this.confirmedLeaderSessionID = null;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Grant leadership to contender {} with session ID {}.", (Object)this.leaderContender.getAddress(), (Object)this.issuedLeaderSessionID);
                }
                this.leaderContender.grantLeadership(this.issuedLeaderSessionID);
            } else {
                LOG.debug("Ignoring the grant leadership notification since the service has already been stopped.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notLeader() {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                this.issuedLeaderSessionID = null;
                this.confirmedLeaderSessionID = null;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Revoke leadership of {}.", (Object)this.leaderContender.getAddress());
                }
                this.leaderContender.revokeLeadership();
            } else {
                LOG.debug("Ignoring the revoke leadership notification since the service has already been stopped.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void nodeChanged() throws Exception {
        block17: {
            try {
                if (!this.leaderLatch.hasLeadership()) break block17;
                Object object = this.lock;
                synchronized (object) {
                    if (this.running) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Leader node changed while {} is the leader with session ID {}.", (Object)this.leaderContender.getAddress(), (Object)this.confirmedLeaderSessionID);
                        }
                        if (this.confirmedLeaderSessionID != null) {
                            ChildData childData = this.cache.getCurrentData();
                            if (childData == null) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Writing leader information into empty node by {}.", (Object)this.leaderContender.getAddress());
                                }
                                this.writeLeaderInformation(this.confirmedLeaderSessionID);
                            } else {
                                byte[] data = childData.getData();
                                if (data == null || data.length == 0) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug("Writing leader information into node with empty data field by {}.", (Object)this.leaderContender.getAddress());
                                    }
                                    this.writeLeaderInformation(this.confirmedLeaderSessionID);
                                } else {
                                    ByteArrayInputStream bais = new ByteArrayInputStream(data);
                                    ObjectInputStream ois = new ObjectInputStream(bais);
                                    String leaderAddress = ois.readUTF();
                                    UUID leaderSessionID = (UUID)ois.readObject();
                                    if (!leaderAddress.equals(this.leaderContender.getAddress()) || leaderSessionID == null || !leaderSessionID.equals(this.confirmedLeaderSessionID)) {
                                        if (LOG.isDebugEnabled()) {
                                            LOG.debug("Correcting leader information by {}.", (Object)this.leaderContender.getAddress());
                                        }
                                        this.writeLeaderInformation(this.confirmedLeaderSessionID);
                                    }
                                }
                            }
                        }
                    } else {
                        LOG.debug("Ignoring node change notification since the service has already been stopped.");
                    }
                }
            }
            catch (Exception e) {
                this.leaderContender.handleError(new Exception("Could not handle node changed event.", e));
                throw e;
            }
        }
    }

    protected void writeLeaderInformation(UUID leaderSessionID) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Write leader information: Leader={}, session ID={}.", (Object)this.leaderContender.getAddress(), (Object)leaderSessionID);
            }
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeUTF(this.leaderContender.getAddress());
            oos.writeObject(leaderSessionID);
            oos.close();
            boolean dataWritten = false;
            while (!dataWritten && this.leaderLatch.hasLeadership()) {
                Stat stat = (Stat)this.client.checkExists().forPath(this.leaderPath);
                if (stat != null) {
                    long sessionID;
                    long owner = stat.getEphemeralOwner();
                    if (owner == (sessionID = this.client.getZookeeperClient().getZooKeeper().getSessionId())) {
                        try {
                            this.client.setData().forPath(this.leaderPath, baos.toByteArray());
                            dataWritten = true;
                        }
                        catch (KeeperException.NoNodeException noNodeException) {}
                        continue;
                    }
                    try {
                        this.client.delete().forPath(this.leaderPath);
                    }
                    catch (KeeperException.NoNodeException noNodeException) {}
                    continue;
                }
                try {
                    ((ACLBackgroundPathAndBytesable)this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(this.leaderPath, baos.toByteArray());
                    dataWritten = true;
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {}
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully wrote leader information: Leader={}, session ID={}.", (Object)this.leaderContender.getAddress(), (Object)leaderSessionID);
            }
        }
        catch (Exception e) {
            this.leaderContender.handleError(new Exception("Could not write leader address and leader session ID to ZooKeeper.", e));
        }
    }

    protected void handleStateChange(ConnectionState newState) {
        switch (newState) {
            case CONNECTED: {
                LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
                break;
            }
            case SUSPENDED: {
                LOG.warn("Connection to ZooKeeper suspended. The contender " + this.leaderContender.getAddress() + " no longer participates in the leader election.");
                break;
            }
            case RECONNECTED: {
                LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");
                break;
            }
            case LOST: {
                LOG.warn("Connection to ZooKeeper lost. The contender " + this.leaderContender.getAddress() + " no longer participates in the leader election.");
            }
        }
    }

    @Override
    public void unhandledError(String message, Throwable e) {
        this.leaderContender.handleError((Exception)new FlinkException("Unhandled error in ZooKeeperLeaderElectionService: " + message, e));
    }

    public String toString() {
        return "ZooKeeperLeaderElectionService{leaderPath='" + this.leaderPath + '\'' + '}';
    }
}

