/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import io.confluent.common.metrics.JmxReporter;
import io.confluent.common.metrics.MeasurableStat;
import io.confluent.common.metrics.MetricConfig;
import io.confluent.common.metrics.MetricName;
import io.confluent.common.metrics.Metrics;
import io.confluent.common.metrics.MetricsReporter;
import io.confluent.common.metrics.Sensor;
import io.confluent.common.metrics.stats.Gauge;
import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroUtils;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.client.rest.utils.UrlList;
import io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.InvalidSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryRequestForwardingException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.exceptions.UnknownMasterException;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.storage.ConfigKey;
import io.confluent.kafka.schemaregistry.storage.ConfigValue;
import io.confluent.kafka.schemaregistry.storage.InMemoryStore;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreMessageHandler;
import io.confluent.kafka.schemaregistry.storage.MD5;
import io.confluent.kafka.schemaregistry.storage.NoopKey;
import io.confluent.kafka.schemaregistry.storage.SchemaIdAndSubjects;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import io.confluent.kafka.schemaregistry.zookeeper.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.zookeeper.ZookeeperMasterElector;
import io.confluent.rest.Application;
import io.confluent.rest.exceptions.RestException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.avro.Schema;
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class KafkaSchemaRegistry
implements SchemaRegistry {
    public static final int MIN_VERSION = 1;
    public static final int MAX_VERSION = Integer.MAX_VALUE;
    public static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE = 20;
    public static final String ZOOKEEPER_SCHEMA_ID_COUNTER = "/schema_id_counter";
    private static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_WRITE_RETRY_BACKOFF_MS = 50;
    private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);
    final Map<Integer, SchemaKey> guidToSchemaKey;
    final Map<MD5, SchemaIdAndSubjects> schemaHashToGuid;
    private final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
    private final Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer;
    private final SchemaRegistryIdentity myIdentity;
    private final Object masterLock = new Object();
    private final AvroCompatibilityLevel defaultCompatibilityLevel;
    private final String schemaRegistryZkNamespace;
    private final String kafkaClusterZkUrl;
    private final int zkSessionTimeoutMs;
    private final int kafkaStoreTimeoutMs;
    private final boolean isEligibleForMasterElector;
    private String schemaRegistryZkUrl;
    private ZkUtils zkUtils;
    private SchemaRegistryIdentity masterIdentity;
    private RestService masterRestService;
    private ZookeeperMasterElector masterElector = null;
    private Metrics metrics;
    private Sensor masterNodeSensor;
    private boolean zkAclsEnabled;
    private int nextAvailableSchemaId;
    private int idBatchInclusiveUpperBound;
    private int maxIdInKafkaStore = -1;

    public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer) throws SchemaRegistryException {
        String host = config.getString("host.name");
        int port = KafkaSchemaRegistry.getPortForIdentity(config.getInt("port"), config.getList("listeners"));
        this.schemaRegistryZkNamespace = config.getString("schema.registry.zk.namespace");
        this.isEligibleForMasterElector = config.getBoolean("master.eligibility");
        this.myIdentity = new SchemaRegistryIdentity(host, port, this.isEligibleForMasterElector);
        this.kafkaClusterZkUrl = config.getString("kafkastore.connection.url");
        this.zkSessionTimeoutMs = config.getInt("kafkastore.zk.session.timeout.ms");
        this.kafkaStoreTimeoutMs = config.getInt("kafkastore.timeout.ms");
        this.serializer = serializer;
        this.defaultCompatibilityLevel = config.compatibilityType();
        this.guidToSchemaKey = new HashMap<Integer, SchemaKey>();
        this.schemaHashToGuid = new HashMap<MD5, SchemaIdAndSubjects>();
        this.kafkaStore = new KafkaStore<NoopKey, SchemaRegistryValue>(config, new KafkaStoreMessageHandler(this), this.serializer, new InMemoryStore(), new NoopKey());
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS);
        List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        String jmxPrefix = "kafka.schema.registry";
        reporters.add(new JmxReporter(jmxPrefix));
        this.metrics = new Metrics(metricConfig, reporters, (Time)new SystemTime());
        this.masterNodeSensor = this.metrics.sensor("master-slave-role");
        MetricName m = new MetricName("master-slave-role", "master-slave-role", "1.0 indicates the node is the active master in the cluster and is the node where all register schema and config update requests are served.");
        this.masterNodeSensor.add(m, (MeasurableStat)new Gauge());
        this.zkAclsEnabled = KafkaSchemaRegistry.checkZkAclConfig(config);
    }

    public static boolean checkZkAclConfig(SchemaRegistryConfig config) {
        if (config.getBoolean("zookeeper.set.acl") && !JaasUtils.isZkSecurityEnabled()) {
            throw new ConfigException("zookeeper.set.acl is set to true but ZooKeeper's JAAS SASL configuration is not configured.");
        }
        return config.getBoolean("zookeeper.set.acl");
    }

    static int getPortForIdentity(int port, List<String> configuredListeners) {
        List listeners = Application.parseListeners(configuredListeners, (int)port, Arrays.asList("http", "https"), (String)"http");
        return ((URI)listeners.get(0)).getPort();
    }

    @Override
    public void init() throws SchemaRegistryInitializationException {
        try {
            this.kafkaStore.init();
        }
        catch (StoreInitializationException e) {
            throw new SchemaRegistryInitializationException("Error initializing kafka store while initializing schema registry", e);
        }
        try {
            this.createZkNamespace();
            this.masterElector = new ZookeeperMasterElector(this.zkUtils, this.myIdentity, this, this.isEligibleForMasterElector);
        }
        catch (SchemaRegistryStoreException e) {
            throw new SchemaRegistryInitializationException("Error electing master while initializing schema registry", e);
        }
        catch (SchemaRegistryTimeoutException e) {
            throw new SchemaRegistryInitializationException(e);
        }
    }

    private void createZkNamespace() {
        int kafkaNamespaceIndex = this.kafkaClusterZkUrl.indexOf("/");
        String zkConnForNamespaceCreation = kafkaNamespaceIndex > 0 ? this.kafkaClusterZkUrl.substring(0, kafkaNamespaceIndex) : this.kafkaClusterZkUrl;
        String schemaRegistryNamespace = "/" + this.schemaRegistryZkNamespace;
        this.schemaRegistryZkUrl = zkConnForNamespaceCreation + schemaRegistryNamespace;
        ZkUtils zkUtilsForNamespaceCreation = ZkUtils.apply((String)zkConnForNamespaceCreation, (int)this.zkSessionTimeoutMs, (int)this.zkSessionTimeoutMs, (boolean)this.zkAclsEnabled);
        zkUtilsForNamespaceCreation.makeSurePersistentPathExists(schemaRegistryNamespace, zkUtilsForNamespaceCreation.DefaultAcls());
        log.info("Created schema registry namespace " + zkConnForNamespaceCreation + schemaRegistryNamespace);
        zkUtilsForNamespaceCreation.close();
        this.zkUtils = ZkUtils.apply((String)this.schemaRegistryZkUrl, (int)this.zkSessionTimeoutMs, (int)this.zkSessionTimeoutMs, (boolean)this.zkAclsEnabled);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isMaster() {
        Object object = this.masterLock;
        synchronized (object) {
            return this.masterIdentity != null && this.masterIdentity.equals(this.myIdentity);
            {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMaster(@Nullable SchemaRegistryIdentity newMaster) throws SchemaRegistryTimeoutException, SchemaRegistryStoreException {
        log.debug("Setting the master to " + newMaster);
        if (newMaster != null && !newMaster.getMasterEligibility()) {
            throw new IllegalStateException("Tried to set an ineligible node to master: " + newMaster);
        }
        Object object = this.masterLock;
        synchronized (object) {
            SchemaRegistryIdentity previousMaster = this.masterIdentity;
            this.masterIdentity = newMaster;
            this.masterRestService = this.masterIdentity == null ? null : new RestService(String.format("http://%s:%d", this.masterIdentity.getHost(), this.masterIdentity.getPort()));
            if (this.masterIdentity != null && !this.masterIdentity.equals(previousMaster) && this.isMaster()) {
                this.nextAvailableSchemaId = this.nextSchemaIdCounterBatch();
                this.idBatchInclusiveUpperBound = this.getInclusiveUpperBound(this.nextAvailableSchemaId);
                this.kafkaStore.markLastWrittenOffsetInvalid();
            }
            this.masterNodeSensor.record(this.isMaster() ? 1.0 : 0.0);
        }
    }

    public SchemaRegistryIdentity myIdentity() {
        return this.myIdentity;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SchemaRegistryIdentity masterIdentity() {
        Object object = this.masterLock;
        synchronized (object) {
            return this.masterIdentity;
        }
    }

    @Override
    public int register(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws SchemaRegistryException {
        try {
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.kafkaStoreTimeoutMs);
            AvroSchema avroSchema = this.canonicalizeSchema(schema);
            MD5 md5 = MD5.ofString(schema.getSchema());
            int schemaId = -1;
            if (this.schemaHashToGuid.containsKey(md5)) {
                SchemaIdAndSubjects schemaIdAndSubjects = this.schemaHashToGuid.get(md5);
                if (schemaIdAndSubjects.hasSubject(subject)) {
                    return schemaIdAndSubjects.getSchemaId();
                }
                schemaId = schemaIdAndSubjects.getSchemaId();
            }
            Iterator<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> allVersions = this.getAllVersions(subject);
            ArrayList<String> allSchemas = new ArrayList<String>();
            io.confluent.kafka.schemaregistry.client.rest.entities.Schema latestSchema = null;
            int newVersion = 1;
            while (allVersions.hasNext()) {
                latestSchema = allVersions.next();
                allSchemas.add(latestSchema.getSchema());
                newVersion = latestSchema.getVersion() + 1;
            }
            if (latestSchema == null || this.isCompatible(subject, avroSchema.canonicalString, allSchemas)) {
                SchemaKey keyForNewVersion = new SchemaKey(subject, newVersion);
                schema.setVersion(Integer.valueOf(newVersion));
                if (schemaId >= 0) {
                    schema.setId(Integer.valueOf(schemaId));
                } else {
                    schema.setId(Integer.valueOf(this.nextAvailableSchemaId));
                    ++this.nextAvailableSchemaId;
                }
                if (this.reachedEndOfIdBatch()) {
                    this.idBatchInclusiveUpperBound = this.getInclusiveUpperBound(this.nextSchemaIdCounterBatch());
                }
                SchemaValue schemaValue = new SchemaValue(schema);
                this.kafkaStore.put(keyForNewVersion, schemaValue);
                return schema.getId();
            }
            throw new IncompatibleSchemaException("New schema is incompatible with an earlier schema.");
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while registering the schema in the backend Kafka store", e);
        }
    }

    public int registerOrForward(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema, Map<String, String> headerProperties) throws SchemaRegistryException {
        io.confluent.kafka.schemaregistry.client.rest.entities.Schema existingSchema = this.lookUpSchemaUnderSubject(subject, schema);
        if (existingSchema != null) {
            return existingSchema.getId();
        }
        Object object = this.masterLock;
        synchronized (object) {
            if (this.isMaster()) {
                return this.register(subject, schema);
            }
            if (this.masterIdentity != null) {
                return this.forwardRegisterRequestToMaster(subject, schema.getSchema(), headerProperties);
            }
            throw new UnknownMasterException("Register schema request failed since master is unknown");
        }
    }

    @Override
    public io.confluent.kafka.schemaregistry.client.rest.entities.Schema lookUpSchemaUnderSubject(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws SchemaRegistryException {
        this.canonicalizeSchema(schema);
        MD5 md5 = MD5.ofString(schema.getSchema());
        if (this.schemaHashToGuid.containsKey(md5)) {
            SchemaIdAndSubjects schemaIdAndSubjects = this.schemaHashToGuid.get(md5);
            if (schemaIdAndSubjects.hasSubject(subject)) {
                io.confluent.kafka.schemaregistry.client.rest.entities.Schema matchingSchema = new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(subject, Integer.valueOf(schemaIdAndSubjects.getVersion(subject)), Integer.valueOf(schemaIdAndSubjects.getSchemaId()), schema.getSchema());
                return matchingSchema;
            }
            return null;
        }
        return null;
    }

    private Integer nextSchemaIdCounterBatch() throws SchemaRegistryStoreException {
        int nextIdBatchLowerBound = 1;
        while (true) {
            if (!this.zkUtils.zkClient().exists(ZOOKEEPER_SCHEMA_ID_COUNTER)) {
                try {
                    nextIdBatchLowerBound = this.getNextBatchLowerBoundFromKafkaStore();
                    int nextIdBatchUpperBound = this.getInclusiveUpperBound(nextIdBatchLowerBound);
                    this.zkUtils.createPersistentPath(ZOOKEEPER_SCHEMA_ID_COUNTER, String.valueOf(nextIdBatchUpperBound), this.zkUtils.DefaultAcls());
                    return nextIdBatchLowerBound;
                }
                catch (ZkNodeExistsException nextIdBatchUpperBound) {}
            } else {
                String nextIdBatchUpperBound;
                int newSchemaIdCounterDataVersion;
                Tuple2 counterValue = this.zkUtils.readData(ZOOKEEPER_SCHEMA_ID_COUNTER);
                String counterData = (String)counterValue._1();
                Stat counterStat = (Stat)counterValue._2();
                if (counterData == null) {
                    throw new SchemaRegistryStoreException("Failed to read schema id counter /schema_id_counter from zookeeper");
                }
                int zkIdCounterValue = Integer.valueOf(counterData);
                int zkNextIdBatchLowerBound = zkIdCounterValue + 1;
                if (zkIdCounterValue % 20 != 0) {
                    int fixedZkIdCounterValue = 20 * (1 + zkIdCounterValue / 20);
                    zkNextIdBatchLowerBound = fixedZkIdCounterValue + 1;
                    log.warn("Zookeeper schema id counter is not an integer multiple of id batch size. Zookeeper may have stale id counter data.\nzk id counter: " + zkIdCounterValue + "\n" + "id batch size: " + 20);
                }
                if ((newSchemaIdCounterDataVersion = ((Integer)this.zkUtils.conditionalUpdatePersistentPath(ZOOKEEPER_SCHEMA_ID_COUNTER, nextIdBatchUpperBound = String.valueOf(this.getInclusiveUpperBound(nextIdBatchLowerBound = Math.max(zkNextIdBatchLowerBound, this.getNextBatchLowerBoundFromKafkaStore()))), counterStat.getVersion(), null)._2()).intValue()) >= 0) break;
            }
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException interruptedException) {}
        }
        return nextIdBatchLowerBound;
    }

    private int forwardRegisterRequestToMaster(String subject, String schemaString, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        RegisterSchemaRequest registerSchemaRequest = new RegisterSchemaRequest();
        registerSchemaRequest.setSchema(schemaString);
        log.debug(String.format("Forwarding registering schema request %s to %s", registerSchemaRequest, baseUrl));
        try {
            int id = this.masterRestService.registerSchema(headerProperties, registerSchemaRequest, subject);
            return id;
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the registering schema request %s to %s", registerSchemaRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardUpdateCompatibilityLevelRequestToMaster(String subject, AvroCompatibilityLevel compatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        ConfigUpdateRequest configUpdateRequest = new ConfigUpdateRequest();
        configUpdateRequest.setCompatibilityLevel(compatibilityLevel.name);
        log.debug(String.format("Forwarding update config request %s to %s", configUpdateRequest, baseUrl));
        try {
            this.masterRestService.updateConfig(headerProperties, configUpdateRequest, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update config request %s to %s", configUpdateRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private AvroSchema canonicalizeSchema(io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws InvalidSchemaException {
        AvroSchema avroSchema = AvroUtils.parseSchema(schema.getSchema());
        if (avroSchema == null) {
            throw new InvalidSchemaException("Invalid schema " + schema.toString());
        }
        schema.setSchema(avroSchema.canonicalString);
        return avroSchema;
    }

    @Override
    public io.confluent.kafka.schemaregistry.client.rest.entities.Schema get(String subject, int version) throws SchemaRegistryException {
        VersionId versionId = new VersionId(version);
        if (versionId.isLatest()) {
            return this.getLatestVersion(subject);
        }
        SchemaKey key = new SchemaKey(subject, version);
        try {
            SchemaValue schemaValue = (SchemaValue)this.kafkaStore.get(key);
            io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema = this.getSchemaEntityFromSchemaValue(schemaValue);
            return schema;
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema from the backend Kafka store", e);
        }
    }

    @Override
    public SchemaString get(int id) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.guidToSchemaKey.get(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka" + " store", e);
        }
        SchemaString schemaString = new SchemaString();
        schemaString.setSchemaString(schema.getSchema());
        return schemaString;
    }

    @Override
    public Set<String> listSubjects() throws SchemaRegistryException {
        try {
            Iterator<SchemaRegistryKey> allKeys = this.kafkaStore.getAllKeys();
            return this.extractUniqueSubjects(allKeys);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    private Set<String> extractUniqueSubjects(Iterator<SchemaRegistryKey> allKeys) {
        HashSet<String> subjects = new HashSet<String>();
        while (allKeys.hasNext()) {
            SchemaRegistryKey k = allKeys.next();
            if (!(k instanceof SchemaKey)) continue;
            SchemaKey key = (SchemaKey)k;
            subjects.add(key.getSubject());
        }
        return subjects;
    }

    @Override
    public Iterator<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> getAllVersions(String subject) throws SchemaRegistryException {
        try {
            SchemaKey key1 = new SchemaKey(subject, 1);
            SchemaKey key2 = new SchemaKey(subject, Integer.MAX_VALUE);
            Iterator<SchemaRegistryValue> allVersions = this.kafkaStore.getAll(key1, key2);
            return this.sortSchemasByVersion(allVersions).iterator();
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public io.confluent.kafka.schemaregistry.client.rest.entities.Schema getLatestVersion(String subject) throws SchemaRegistryException {
        try {
            SchemaKey key1 = new SchemaKey(subject, 1);
            SchemaKey key2 = new SchemaKey(subject, Integer.MAX_VALUE);
            Iterator<SchemaRegistryValue> allVersions = this.kafkaStore.getAll(key1, key2);
            Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> sortedVersions = this.sortSchemasByVersion(allVersions);
            io.confluent.kafka.schemaregistry.client.rest.entities.Schema latestSchema = null;
            if (sortedVersions.size() > 0) {
                latestSchema = sortedVersions.lastElement();
            }
            return latestSchema;
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public void close() {
        log.info("Shutting down schema registry");
        this.kafkaStore.close();
        if (this.masterElector != null) {
            this.masterElector.close();
        }
        if (this.zkUtils != null) {
            this.zkUtils.close();
        }
    }

    public void updateCompatibilityLevel(String subject, AvroCompatibilityLevel newCompatibilityLevel) throws SchemaRegistryStoreException, UnknownMasterException {
        ConfigKey configKey = new ConfigKey(subject);
        try {
            this.kafkaStore.put(configKey, new ConfigValue(newCompatibilityLevel));
            log.debug("Wrote new compatibility level: " + newCompatibilityLevel.name + " to the" + " Kafka data store with key " + configKey.toString());
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to write new config value to the store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateConfigOrForward(String subject, AvroCompatibilityLevel newCompatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, UnknownMasterException {
        Object object = this.masterLock;
        synchronized (object) {
            if (this.isMaster()) {
                this.updateCompatibilityLevel(subject, newCompatibilityLevel);
            } else if (this.masterIdentity != null) {
                this.forwardUpdateCompatibilityLevelRequestToMaster(subject, newCompatibilityLevel, headerProperties);
            } else {
                throw new UnknownMasterException("Update config request failed since master is unknown");
            }
        }
    }

    public AvroCompatibilityLevel getCompatibilityLevel(String subject) throws SchemaRegistryStoreException {
        ConfigValue config;
        ConfigKey subjectConfigKey = new ConfigKey(subject);
        try {
            config = (ConfigValue)this.kafkaStore.get(subjectConfigKey);
            if (config == null && subject == null) {
                config = new ConfigValue(this.defaultCompatibilityLevel);
            } else if (config == null) {
                config = new ConfigValue();
            }
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to read config from the kafka store", e);
        }
        return config.getCompatibilityLevel();
    }

    @Override
    public boolean isCompatible(String subject, String newSchemaObj, String latestSchema) throws SchemaRegistryException {
        if (latestSchema == null) {
            throw new InvalidSchemaException("Latest schema not provided");
        }
        return this.isCompatible(subject, newSchemaObj, Collections.singletonList(latestSchema));
    }

    @Override
    public boolean isCompatible(String subject, String newSchemaObj, List<String> previousSchemas) throws SchemaRegistryException {
        AvroSchema newAvroSchema = AvroUtils.parseSchema(newSchemaObj);
        if (previousSchemas == null || previousSchemas.isEmpty()) {
            throw new InvalidSchemaException("Previous schema not provided");
        }
        ArrayList<Schema> previousAvroSchemas = new ArrayList<Schema>(previousSchemas.size());
        for (String previousSchema : previousSchemas) {
            if (previousSchema == null) {
                throw new InvalidSchemaException("Existing schema " + previousSchema + " is not a valid Avro schema");
            }
            AvroSchema previousAvroSchema = AvroUtils.parseSchema(previousSchema);
            previousAvroSchemas.add(previousAvroSchema.schemaObj);
        }
        AvroCompatibilityLevel compatibility = this.getCompatibilityLevel(subject);
        if (compatibility == null) {
            compatibility = this.getCompatibilityLevel(null);
        }
        return compatibility.compatibilityChecker.isCompatible(newAvroSchema.schemaObj, previousAvroSchemas);
    }

    KafkaStore<SchemaRegistryKey, SchemaRegistryValue> getKafkaStore() {
        return this.kafkaStore;
    }

    private Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> sortSchemasByVersion(Iterator<SchemaRegistryValue> schemas) {
        Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> schemaVector = new Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema>();
        while (schemas.hasNext()) {
            SchemaValue schemaValue = (SchemaValue)schemas.next();
            schemaVector.add(this.getSchemaEntityFromSchemaValue(schemaValue));
        }
        Collections.sort(schemaVector);
        return schemaVector;
    }

    private io.confluent.kafka.schemaregistry.client.rest.entities.Schema getSchemaEntityFromSchemaValue(SchemaValue schemaValue) {
        if (schemaValue == null) {
            return null;
        }
        return new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(schemaValue.getSubject(), schemaValue.getVersion(), schemaValue.getId(), schemaValue.getSchema());
    }

    int getMaxIdInKafkaStore() {
        return this.maxIdInKafkaStore;
    }

    void setMaxIdInKafkaStore(int id) {
        this.maxIdInKafkaStore = id;
    }

    private boolean reachedEndOfIdBatch() {
        return this.nextAvailableSchemaId > this.idBatchInclusiveUpperBound;
    }

    private int getNextBatchLowerBoundFromKafkaStore() {
        if (this.getMaxIdInKafkaStore() <= 0) {
            return 1;
        }
        int nextBatchLowerBound = 1 + this.getMaxIdInKafkaStore() / 20;
        return 1 + nextBatchLowerBound * 20;
    }

    private int getInclusiveUpperBound(int inclusiveLowerBound) {
        return inclusiveLowerBound + 20 - 1;
    }
}

