/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.masterelector.zookeeper;

import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.MasterAwareSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.MasterElector;
import io.confluent.kafka.schemaregistry.storage.SchemaIdRange;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import java.io.IOException;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class ZookeeperMasterElector
implements MasterElector {
    private static final Logger log = LoggerFactory.getLogger(ZookeeperMasterElector.class);
    private static final String MASTER_PATH = "/schema_registry_master";
    public static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE = 20;
    public static final String ZOOKEEPER_SCHEMA_ID_COUNTER = "/schema_id_counter";
    private static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_WRITE_RETRY_BACKOFF_MS = 50;
    private final boolean isEligibleForMasterElection;
    private final ZkClient zkClient;
    private final ZkUtils zkUtils;
    private final SchemaRegistryIdentity myIdentity;
    private final String myIdentityString;
    private final MasterAwareSchemaRegistry schemaRegistry;

    public ZookeeperMasterElector(SchemaRegistryConfig config, SchemaRegistryIdentity myIdentity, MasterAwareSchemaRegistry schemaRegistry) throws SchemaRegistryStoreException {
        this.isEligibleForMasterElection = myIdentity.getMasterEligibility();
        this.zkUtils = this.createZkNamespace(config);
        this.zkClient = this.zkUtils.zkClient();
        this.myIdentity = myIdentity;
        try {
            this.myIdentityString = myIdentity.toJson();
        }
        catch (IOException e) {
            throw new SchemaRegistryStoreException(String.format("Error while serializing schema registry identity %s to json", myIdentity.toString()), e);
        }
        this.schemaRegistry = schemaRegistry;
        this.zkClient.subscribeStateChanges((IZkStateListener)new SessionExpirationListener());
        this.zkClient.subscribeDataChanges(MASTER_PATH, (IZkDataListener)new MasterChangeListener());
    }

    @Override
    public void init() throws SchemaRegistryTimeoutException, SchemaRegistryStoreException, SchemaRegistryInitializationException {
        if (this.isEligibleForMasterElection) {
            this.electMaster();
        } else {
            this.readCurrentMaster();
        }
    }

    @Override
    public void close() {
        this.zkClient.unsubscribeAll();
        this.zkUtils.close();
    }

    public void electMaster() throws SchemaRegistryStoreException, SchemaRegistryTimeoutException, SchemaRegistryInitializationException {
        SchemaRegistryIdentity masterIdentity = null;
        try {
            this.zkUtils.createEphemeralPathExpectConflict(MASTER_PATH, this.myIdentityString, this.zkUtils.defaultAcls(MASTER_PATH));
            log.info("Successfully elected the new master: " + this.myIdentityString);
            masterIdentity = this.myIdentity;
            this.schemaRegistry.setMaster(masterIdentity);
        }
        catch (ZkNodeExistsException znee) {
            this.readCurrentMaster();
        }
    }

    public void readCurrentMaster() throws SchemaRegistryTimeoutException, SchemaRegistryStoreException, SchemaRegistryInitializationException {
        SchemaRegistryIdentity masterIdentity = null;
        try {
            String masterIdentityString = (String)this.zkUtils.readData(MASTER_PATH)._1();
            try {
                masterIdentity = SchemaRegistryIdentity.fromJson(masterIdentityString);
            }
            catch (IOException ioe) {
                log.error("Can't parse schema registry identity json string " + masterIdentityString);
            }
        }
        catch (ZkNoNodeException zkNoNodeException) {
            // empty catch block
        }
        if (this.myIdentity.equals(masterIdentity)) {
            log.error("The node's identity is same as elected master. Check the ``listeners`` config or the ``host.name`` and the ``port`` config");
            throw new SchemaRegistryInitializationException("Invalid identity");
        }
        this.schemaRegistry.setMaster(masterIdentity);
    }

    @Override
    public SchemaIdRange nextRange() throws SchemaRegistryStoreException {
        int base = this.nextSchemaIdCounterBatch();
        return new SchemaIdRange(base, this.getInclusiveUpperBound(base));
    }

    private ZkUtils createZkNamespace(SchemaRegistryConfig config) {
        boolean zkAclsEnabled = config.checkZkAclConfig();
        String schemaRegistryZkNamespace = config.getString("schema.registry.zk.namespace");
        String srClusterZkUrl = config.getString("kafkastore.connection.url");
        int zkSessionTimeoutMs = config.getInt("kafkastore.zk.session.timeout.ms");
        int kafkaNamespaceIndex = srClusterZkUrl.indexOf("/");
        String zkConnForNamespaceCreation = kafkaNamespaceIndex > 0 ? srClusterZkUrl.substring(0, kafkaNamespaceIndex) : srClusterZkUrl;
        String schemaRegistryNamespace = "/" + schemaRegistryZkNamespace;
        String schemaRegistryZkUrl = zkConnForNamespaceCreation + schemaRegistryNamespace;
        ZkUtils zkUtilsForNamespaceCreation = ZkUtils.apply((String)zkConnForNamespaceCreation, (int)zkSessionTimeoutMs, (int)zkSessionTimeoutMs, (boolean)zkAclsEnabled);
        zkUtilsForNamespaceCreation.makeSurePersistentPathExists(schemaRegistryNamespace, zkUtilsForNamespaceCreation.defaultAcls(schemaRegistryNamespace));
        log.info("Created schema registry namespace " + zkConnForNamespaceCreation + schemaRegistryNamespace);
        zkUtilsForNamespaceCreation.close();
        return ZkUtils.apply((String)schemaRegistryZkUrl, (int)zkSessionTimeoutMs, (int)zkSessionTimeoutMs, (boolean)zkAclsEnabled);
    }

    private Integer nextSchemaIdCounterBatch() throws SchemaRegistryStoreException {
        int nextIdBatchLowerBound = 1;
        while (true) {
            if (!this.zkUtils.zkClient().exists(ZOOKEEPER_SCHEMA_ID_COUNTER)) {
                try {
                    nextIdBatchLowerBound = this.getNextBatchLowerBoundFromKafkaStore();
                    int nextIdBatchUpperBound = this.getInclusiveUpperBound(nextIdBatchLowerBound);
                    this.zkUtils.createPersistentPath(ZOOKEEPER_SCHEMA_ID_COUNTER, String.valueOf(nextIdBatchUpperBound), this.zkUtils.defaultAcls(ZOOKEEPER_SCHEMA_ID_COUNTER));
                    return nextIdBatchLowerBound;
                }
                catch (ZkNodeExistsException nextIdBatchUpperBound) {}
            } else {
                String nextIdBatchUpperBound;
                int newSchemaIdCounterDataVersion;
                Tuple2 counterValue = this.zkUtils.readData(ZOOKEEPER_SCHEMA_ID_COUNTER);
                String counterData = (String)counterValue._1();
                Stat counterStat = (Stat)counterValue._2();
                if (counterData == null) {
                    throw new SchemaRegistryStoreException("Failed to read schema id counter /schema_id_counter from zookeeper");
                }
                int zkIdCounterValue = Integer.parseInt(counterData);
                int zkNextIdBatchLowerBound = zkIdCounterValue + 1;
                if (zkIdCounterValue % 20 != 0) {
                    int fixedZkIdCounterValue = 20 * (1 + zkIdCounterValue / 20);
                    zkNextIdBatchLowerBound = fixedZkIdCounterValue + 1;
                    log.warn("Zookeeper schema id counter is not an integer multiple of id batch size. Zookeeper may have stale id counter data.\nzk id counter: " + zkIdCounterValue + "\nid batch size: " + 20);
                }
                if ((newSchemaIdCounterDataVersion = ((Integer)this.zkUtils.conditionalUpdatePersistentPath(ZOOKEEPER_SCHEMA_ID_COUNTER, nextIdBatchUpperBound = String.valueOf(this.getInclusiveUpperBound(nextIdBatchLowerBound = Math.max(zkNextIdBatchLowerBound, this.getNextBatchLowerBoundFromKafkaStore()))), counterStat.getVersion(), null)._2()).intValue()) >= 0) break;
            }
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException interruptedException) {}
        }
        return nextIdBatchLowerBound;
    }

    private int getNextBatchLowerBoundFromKafkaStore() {
        if (this.schemaRegistry.getMaxIdInKafkaStore() <= 0) {
            return 1;
        }
        int nextBatchLowerBound = 1 + this.schemaRegistry.getMaxIdInKafkaStore() / 20;
        return 1 + nextBatchLowerBound * 20;
    }

    private int getInclusiveUpperBound(int inclusiveLowerBound) {
        return inclusiveLowerBound + 20 - 1;
    }

    private class SessionExpirationListener
    implements IZkStateListener {
        public void handleStateChanged(Watcher.Event.KeeperState state) {
        }

        public void handleNewSession() throws Exception {
            if (ZookeeperMasterElector.this.isEligibleForMasterElection) {
                ZookeeperMasterElector.this.electMaster();
            } else {
                ZookeeperMasterElector.this.readCurrentMaster();
            }
        }

        public void handleSessionEstablishmentError(Throwable t) throws Exception {
            log.error("Failed to re-establish Zookeeper connection: ", t);
            throw new SchemaRegistryStoreException("Couldn't establish Zookeeper connection", t);
        }
    }

    private class MasterChangeListener
    implements IZkDataListener {
        public void handleDataChange(String dataPath, Object data) {
            try {
                if (ZookeeperMasterElector.this.isEligibleForMasterElection) {
                    ZookeeperMasterElector.this.electMaster();
                } else {
                    ZookeeperMasterElector.this.readCurrentMaster();
                }
            }
            catch (SchemaRegistryException e) {
                log.error("Error while reading the schema registry master", (Throwable)e);
            }
        }

        public void handleDataDeleted(String dataPath) throws Exception {
            if (ZookeeperMasterElector.this.isEligibleForMasterElection) {
                ZookeeperMasterElector.this.electMaster();
            } else {
                ZookeeperMasterElector.this.schemaRegistry.setMaster(null);
            }
        }
    }
}

