/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import io.confluent.common.metrics.JmxReporter;
import io.confluent.common.metrics.MeasurableStat;
import io.confluent.common.metrics.MetricConfig;
import io.confluent.common.metrics.MetricName;
import io.confluent.common.metrics.Metrics;
import io.confluent.common.metrics.MetricsReporter;
import io.confluent.common.metrics.Sensor;
import io.confluent.common.metrics.stats.Gauge;
import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroUtils;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ModeUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.client.rest.utils.UrlList;
import io.confluent.kafka.schemaregistry.exceptions.IdDoesNotMatchException;
import io.confluent.kafka.schemaregistry.exceptions.IdGenerationException;
import io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.InvalidSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.OperationNotPermittedException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryRequestForwardingException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.exceptions.UnknownMasterException;
import io.confluent.kafka.schemaregistry.id.IdGenerator;
import io.confluent.kafka.schemaregistry.id.IncrementalIdGenerator;
import io.confluent.kafka.schemaregistry.id.ZookeeperIdGenerator;
import io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector;
import io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SslFactory;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectKey;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectValue;
import io.confluent.kafka.schemaregistry.storage.ConfigKey;
import io.confluent.kafka.schemaregistry.storage.ConfigValue;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectKey;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectValue;
import io.confluent.kafka.schemaregistry.storage.InMemoryCache;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreMessageHandler;
import io.confluent.kafka.schemaregistry.storage.LookupCache;
import io.confluent.kafka.schemaregistry.storage.MasterAwareSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.MasterElector;
import io.confluent.kafka.schemaregistry.storage.Mode;
import io.confluent.kafka.schemaregistry.storage.ModeKey;
import io.confluent.kafka.schemaregistry.storage.ModeValue;
import io.confluent.kafka.schemaregistry.storage.NoopKey;
import io.confluent.kafka.schemaregistry.storage.SchemaIdAndSubjects;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import io.confluent.rest.Application;
import io.confluent.rest.exceptions.RestException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.reflect.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSchemaRegistry
implements SchemaRegistry,
MasterAwareSchemaRegistry {
    public static final int MIN_VERSION = 1;
    public static final int MAX_VERSION = Integer.MAX_VALUE;
    private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);
    private final SchemaRegistryConfig config;
    private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
    final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
    private final Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer;
    private final SchemaRegistryIdentity myIdentity;
    private final Object masterLock = new Object();
    private final AvroCompatibilityLevel defaultCompatibilityLevel;
    private final Mode defaultMode;
    private final int kafkaStoreTimeoutMs;
    private final int initTimeout;
    private final boolean isEligibleForMasterElector;
    private final boolean allowModeChanges;
    private SchemaRegistryIdentity masterIdentity;
    private RestService masterRestService;
    private SslFactory sslFactory;
    private IdGenerator idGenerator = null;
    private MasterElector masterElector = null;
    private Metrics metrics;
    private Sensor masterNodeSensor;

    public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer) throws SchemaRegistryException {
        this.config = config;
        String host = config.getString("host.name");
        SchemeAndPort schemeAndPort = KafkaSchemaRegistry.getSchemeAndPortForIdentity(config.getInt("port"), config.getList("listeners"), config.interInstanceProtocol());
        this.isEligibleForMasterElector = config.getBoolean("master.eligibility");
        this.allowModeChanges = config.getBoolean("mode.mutability");
        this.myIdentity = new SchemaRegistryIdentity(host, schemeAndPort.port, this.isEligibleForMasterElector, schemeAndPort.scheme);
        this.sslFactory = new SslFactory(config);
        this.kafkaStoreTimeoutMs = config.getInt("kafkastore.timeout.ms");
        this.initTimeout = config.getInt("kafkastore.init.timeout.ms");
        this.serializer = serializer;
        this.defaultCompatibilityLevel = config.compatibilityType();
        this.defaultMode = Mode.READWRITE;
        this.lookupCache = this.lookupCache();
        this.idGenerator = this.identityGenerator(config);
        this.kafkaStore = this.kafkaStore(config);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS);
        List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        String jmxPrefix = "kafka.schema.registry";
        reporters.add(new JmxReporter(jmxPrefix));
        this.metrics = new Metrics(metricConfig, reporters, (Time)new SystemTime());
        this.masterNodeSensor = this.metrics.sensor("master-slave-role");
        Map configuredTags = config.getMap("metrics.tag.map");
        MetricName m = new MetricName("master-slave-role", "master-slave-role", "1.0 indicates the node is the active master in the cluster and is the node where all register schema and config update requests are served.", configuredTags);
        this.masterNodeSensor.add(m, (MeasurableStat)new Gauge());
    }

    protected KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore(SchemaRegistryConfig config) throws SchemaRegistryException {
        return new KafkaStore<SchemaRegistryKey, SchemaRegistryValue>(config, new KafkaStoreMessageHandler(this, this.lookupCache, this.idGenerator), this.serializer, this.lookupCache, new NoopKey());
    }

    protected LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache() {
        return new InMemoryCache<SchemaRegistryKey, SchemaRegistryValue>();
    }

    protected IdGenerator identityGenerator(SchemaRegistryConfig config) {
        IdGenerator idGenerator = config.useKafkaCoordination() ? new IncrementalIdGenerator() : new ZookeeperIdGenerator();
        idGenerator.configure(config);
        return idGenerator;
    }

    static SchemeAndPort getSchemeAndPortForIdentity(int port, List<String> configuredListeners, String requestedScheme) throws SchemaRegistryException {
        List listeners = Application.parseListeners(configuredListeners, (int)port, Arrays.asList("http", "https"), (String)"http");
        if (requestedScheme.isEmpty()) {
            requestedScheme = "http";
        }
        for (URI listener : listeners) {
            if (!requestedScheme.equalsIgnoreCase(listener.getScheme())) continue;
            return new SchemeAndPort(listener.getScheme(), listener.getPort());
        }
        throw new SchemaRegistryException(" No listener configured with requested scheme " + requestedScheme);
    }

    @Override
    public void init() throws SchemaRegistryException {
        try {
            this.kafkaStore.init();
        }
        catch (StoreInitializationException e) {
            throw new SchemaRegistryInitializationException("Error initializing kafka store while initializing schema registry", e);
        }
        try {
            if (this.config.useKafkaCoordination()) {
                log.info("Joining schema registry with Kafka-based coordination");
                this.masterElector = new KafkaGroupMasterElector(this.config, this.myIdentity, this);
            } else {
                log.info("Joining schema registry with Zookeeper-based coordination");
                this.masterElector = new ZookeeperMasterElector(this.config, this.myIdentity, this);
            }
            this.masterElector.init();
        }
        catch (SchemaRegistryStoreException e) {
            throw new SchemaRegistryInitializationException("Error electing master while initializing schema registry", e);
        }
        catch (SchemaRegistryTimeoutException e) {
            throw new SchemaRegistryInitializationException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isMaster() {
        Object object = this.masterLock;
        synchronized (object) {
            return this.masterIdentity != null && this.masterIdentity.equals(this.myIdentity);
            {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setMaster(@Nullable SchemaRegistryIdentity newMaster) throws SchemaRegistryTimeoutException, SchemaRegistryStoreException, IdGenerationException {
        log.debug("Setting the master to " + newMaster);
        if (newMaster != null && !newMaster.getMasterEligibility()) {
            throw new IllegalStateException("Tried to set an ineligible node to master: " + newMaster);
        }
        Object object = this.masterLock;
        synchronized (object) {
            SchemaRegistryIdentity previousMaster = this.masterIdentity;
            this.masterIdentity = newMaster;
            if (this.masterIdentity == null) {
                this.masterRestService = null;
            } else {
                this.masterRestService = new RestService(this.masterIdentity.getUrl());
                if (this.sslFactory != null && this.sslFactory.sslContext() != null) {
                    this.masterRestService.setSslSocketFactory(this.sslFactory.sslContext().getSocketFactory());
                }
            }
            if (this.masterIdentity != null && !this.masterIdentity.equals(previousMaster) && this.isMaster()) {
                this.kafkaStore.markLastWrittenOffsetInvalid();
                try {
                    this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.initTimeout);
                }
                catch (StoreException e) {
                    throw new SchemaRegistryStoreException("Exception getting latest offset ", e);
                }
                this.idGenerator.init();
            }
            this.masterNodeSensor.record(this.isMaster() ? 1.0 : 0.0);
        }
    }

    public SchemaRegistryIdentity myIdentity() {
        return this.myIdentity;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SchemaRegistryIdentity masterIdentity() {
        Object object = this.masterLock;
        synchronized (object) {
            return this.masterIdentity;
        }
    }

    @Override
    public int register(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws SchemaRegistryException {
        try {
            this.checkRegisterMode(subject, schema);
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.kafkaStoreTimeoutMs);
            int schemaId = schema.getId();
            SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema);
            if (schemaIdAndSubjects != null) {
                if (schemaId >= 0 && schemaId != schemaIdAndSubjects.getSchemaId()) {
                    throw new IdDoesNotMatchException(schemaIdAndSubjects.getSchemaId(), schema.getId());
                }
                if (schemaIdAndSubjects.hasSubject(subject) && !this.isSubjectVersionDeleted(subject, schemaIdAndSubjects.getVersion(subject))) {
                    return schemaIdAndSubjects.getSchemaId();
                }
                schemaId = schemaIdAndSubjects.getSchemaId();
            }
            Iterator<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> allVersions = this.getAllVersions(subject, true);
            Iterator<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> undeletedVersions = this.getAllVersions(subject, false);
            ArrayList<String> undeletedSchemasList = new ArrayList<String>();
            io.confluent.kafka.schemaregistry.client.rest.entities.Schema latestSchema = null;
            int newVersion = 1;
            while (allVersions.hasNext()) {
                newVersion = allVersions.next().getVersion() + 1;
            }
            while (undeletedVersions.hasNext()) {
                latestSchema = undeletedVersions.next();
                undeletedSchemasList.add(latestSchema.getSchema());
            }
            AvroSchema avroSchema = this.canonicalizeSchema(schema);
            if (latestSchema == null || this.isCompatible(subject, avroSchema.canonicalString, undeletedSchemasList)) {
                if (schema.getVersion() <= 0) {
                    schema.setVersion(Integer.valueOf(newVersion));
                }
                if (schemaId >= 0) {
                    schema.setId(Integer.valueOf(schemaId));
                } else {
                    int newId = this.idGenerator.id(schema);
                    if (this.lookupCache.schemaKeyById(newId) != null) {
                        throw new SchemaRegistryStoreException("Error while registering the schema due to generating an ID that is already in use.");
                    }
                    schema.setId(Integer.valueOf(newId));
                }
                SchemaValue schemaValue = new SchemaValue(schema);
                this.kafkaStore.put(new SchemaKey(subject, schema.getVersion()), schemaValue);
                return schema.getId();
            }
            throw new IncompatibleSchemaException("New schema is incompatible with an earlier schema.");
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while registering the schema in the backend Kafka store", e);
        }
    }

    private void checkRegisterMode(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws OperationNotPermittedException, SchemaRegistryStoreException {
        if (this.getModeInScope(subject) == Mode.READONLY) {
            throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
        }
        if (schema.getId() >= 0 || schema.getVersion() > 0) {
            if (this.getModeInScope(subject) != Mode.IMPORT) {
                throw new OperationNotPermittedException("Subject " + subject + " is not in import mode");
            }
        } else if (this.getModeInScope(subject) != Mode.READWRITE) {
            throw new OperationNotPermittedException("Subject " + subject + " is not in read-write mode");
        }
    }

    public int registerOrForward(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema, Map<String, String> headerProperties) throws SchemaRegistryException {
        io.confluent.kafka.schemaregistry.client.rest.entities.Schema existingSchema = this.lookUpSchemaUnderSubject(subject, schema, false);
        if (existingSchema != null) {
            if (schema.getId() != null && schema.getId() >= 0 && !schema.getId().equals(existingSchema.getId())) {
                throw new IdDoesNotMatchException(existingSchema.getId(), schema.getId());
            }
            return existingSchema.getId();
        }
        Object object = this.masterLock;
        synchronized (object) {
            if (this.isMaster()) {
                return this.register(subject, schema);
            }
            if (this.masterIdentity != null) {
                return this.forwardRegisterRequestToMaster(subject, schema, headerProperties);
            }
            throw new UnknownMasterException("Register schema request failed since master is unknown");
        }
    }

    @Override
    public void deleteSchemaVersion(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws SchemaRegistryException {
        try {
            if (this.getModeInScope(subject) == Mode.READONLY) {
                throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
            }
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.kafkaStoreTimeoutMs);
            SchemaValue schemaValue = new SchemaValue(schema);
            schemaValue.setDeleted(true);
            this.kafkaStore.put(new SchemaKey(subject, schema.getVersion()), schemaValue);
            if (!this.getAllVersions(subject, false).hasNext()) {
                if (this.getMode(subject) != null) {
                    this.deleteMode(subject);
                }
                if (this.getCompatibilityLevel(subject) != null) {
                    this.deleteSubjectCompatibility(subject);
                }
            }
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while deleting the schema in the backend Kafka store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deleteSchemaVersionOrForward(Map<String, String> headerProperties, String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws SchemaRegistryException {
        Object object = this.masterLock;
        synchronized (object) {
            if (this.isMaster()) {
                this.deleteSchemaVersion(subject, schema);
            } else if (this.masterIdentity != null) {
                this.forwardDeleteSchemaVersionRequestToMaster(headerProperties, subject, schema.getVersion());
            } else {
                throw new UnknownMasterException("Register schema request failed since master is unknown");
            }
        }
    }

    @Override
    public List<Integer> deleteSubject(String subject) throws SchemaRegistryException {
        try {
            if (this.getModeInScope(subject) == Mode.READONLY) {
                throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
            }
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.kafkaStoreTimeoutMs);
            ArrayList<Integer> deletedVersions = new ArrayList<Integer>();
            int deleteWatermarkVersion = 0;
            Iterator<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> schemasToBeDeleted = this.getAllVersions(subject, false);
            while (schemasToBeDeleted.hasNext()) {
                deleteWatermarkVersion = schemasToBeDeleted.next().getVersion();
                deletedVersions.add(deleteWatermarkVersion);
            }
            DeleteSubjectKey key = new DeleteSubjectKey(subject);
            DeleteSubjectValue value = new DeleteSubjectValue(subject, deleteWatermarkVersion);
            this.kafkaStore.put(key, value);
            if (this.getMode(subject) != null) {
                this.deleteMode(subject);
            }
            if (this.getCompatibilityLevel(subject) != null) {
                this.deleteSubjectCompatibility(subject);
            }
            return deletedVersions;
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while deleting the subject in the backend Kafka store", e);
        }
    }

    public List<Integer> deleteSubjectOrForward(Map<String, String> requestProperties, String subject) throws SchemaRegistryException {
        Object object = this.masterLock;
        synchronized (object) {
            if (this.isMaster()) {
                return this.deleteSubject(subject);
            }
            if (this.masterIdentity != null) {
                return this.forwardDeleteSubjectRequestToMaster(requestProperties, subject);
            }
            throw new UnknownMasterException("Register schema request failed since master is unknown");
        }
    }

    @Override
    public io.confluent.kafka.schemaregistry.client.rest.entities.Schema lookUpSchemaUnderSubject(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema, boolean lookupDeletedSchema) throws SchemaRegistryException {
        this.canonicalizeSchema(schema);
        SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema);
        if (schemaIdAndSubjects != null) {
            if (schemaIdAndSubjects.hasSubject(subject) && (lookupDeletedSchema || !this.isSubjectVersionDeleted(subject, schemaIdAndSubjects.getVersion(subject)))) {
                io.confluent.kafka.schemaregistry.client.rest.entities.Schema matchingSchema = new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(subject, Integer.valueOf(schemaIdAndSubjects.getVersion(subject)), Integer.valueOf(schemaIdAndSubjects.getSchemaId()), schema.getSchema());
                return matchingSchema;
            }
            return null;
        }
        return null;
    }

    private int forwardRegisterRequestToMaster(String subject, io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        RegisterSchemaRequest registerSchemaRequest = new RegisterSchemaRequest();
        registerSchemaRequest.setSchema(schema.getSchema());
        registerSchemaRequest.setVersion(schema.getVersion());
        registerSchemaRequest.setId(schema.getId());
        log.debug(String.format("Forwarding registering schema request %s to %s", registerSchemaRequest, baseUrl));
        try {
            int id = this.masterRestService.registerSchema(headerProperties, registerSchemaRequest, subject);
            return id;
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the registering schema request %s to %s", registerSchemaRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardUpdateCompatibilityLevelRequestToMaster(String subject, AvroCompatibilityLevel compatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        ConfigUpdateRequest configUpdateRequest = new ConfigUpdateRequest();
        configUpdateRequest.setCompatibilityLevel(compatibilityLevel.name);
        log.debug(String.format("Forwarding update config request %s to %s", configUpdateRequest, baseUrl));
        try {
            this.masterRestService.updateConfig(headerProperties, configUpdateRequest, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update config request %s to %s", configUpdateRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardDeleteSchemaVersionRequestToMaster(Map<String, String> headerProperties, String subject, Integer version) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        log.debug(String.format("Forwarding deleteSchemaVersion schema version request %s-%s to %s", subject, version, baseUrl));
        try {
            this.masterRestService.deleteSchemaVersion(headerProperties, subject, String.valueOf(version));
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding deleteSchemaVersion schema version request %s-%s to %s", subject, version, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private List<Integer> forwardDeleteSubjectRequestToMaster(Map<String, String> requestProperties, String subject) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        log.debug(String.format("Forwarding delete subject request for  %s to %s", subject, baseUrl));
        try {
            return this.masterRestService.deleteSubject(requestProperties, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding delete subject request %s to %s", subject, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardSetModeRequestToMaster(String subject, Mode mode, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        ModeUpdateRequest modeUpdateRequest = new ModeUpdateRequest();
        modeUpdateRequest.setMode(mode.name());
        log.debug(String.format("Forwarding update mode request %s to %s", modeUpdateRequest, baseUrl));
        try {
            this.masterRestService.setMode(headerProperties, modeUpdateRequest, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update mode request %s to %s", modeUpdateRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private AvroSchema canonicalizeSchema(io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema) throws InvalidSchemaException {
        AvroSchema avroSchema = AvroUtils.parseSchema(schema.getSchema());
        if (avroSchema == null) {
            throw new InvalidSchemaException("Invalid schema " + schema.toString());
        }
        schema.setSchema(avroSchema.canonicalString);
        return avroSchema;
    }

    public io.confluent.kafka.schemaregistry.client.rest.entities.Schema validateAndGetSchema(String subject, VersionId versionId, boolean returnDeletedSchema) throws SchemaRegistryException {
        io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema = this.get(subject, versionId.getVersionId(), returnDeletedSchema);
        if (schema == null) {
            if (!this.listSubjects().contains(subject)) {
                throw Errors.subjectNotFoundException();
            }
            throw Errors.versionNotFoundException();
        }
        return schema;
    }

    @Override
    public io.confluent.kafka.schemaregistry.client.rest.entities.Schema get(String subject, int version, boolean returnDeletedSchema) throws SchemaRegistryException {
        VersionId versionId = new VersionId(version);
        if (versionId.isLatest()) {
            return this.getLatestVersion(subject);
        }
        SchemaKey key = new SchemaKey(subject, version);
        try {
            SchemaValue schemaValue = (SchemaValue)this.kafkaStore.get(key);
            io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema = null;
            if (schemaValue != null && !schemaValue.isDeleted() || returnDeletedSchema) {
                schema = this.getSchemaEntityFromSchemaValue(schemaValue);
            }
            return schema;
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema from the backend Kafka store", e);
        }
    }

    @Override
    public SchemaString get(int id) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.lookupCache.schemaKeyById(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
            if (schema == null) {
                return null;
            }
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka store", e);
        }
        SchemaString schemaString = new SchemaString();
        schemaString.setSchemaString(schema.getSchema());
        return schemaString;
    }

    @Override
    public Set<String> listSubjects() throws SchemaRegistryException {
        try {
            Iterator<SchemaRegistryKey> allKeys = this.kafkaStore.getAllKeys();
            return this.extractUniqueSubjects(allKeys);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    private Set<String> extractUniqueSubjects(Iterator<SchemaRegistryKey> allKeys) throws StoreException {
        HashSet<String> subjects = new HashSet<String>();
        while (allKeys.hasNext()) {
            SchemaKey key;
            SchemaValue value;
            SchemaRegistryKey k = allKeys.next();
            if (!(k instanceof SchemaKey) || (value = (SchemaValue)this.kafkaStore.get(key = (SchemaKey)k)) == null || value.isDeleted()) continue;
            subjects.add(key.getSubject());
        }
        return subjects;
    }

    public boolean hasSubjects(String subject) throws SchemaRegistryStoreException {
        try {
            return this.lookupCache.hasSubjects(subject);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public Iterator<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> getAllVersions(String subject, boolean returnDeletedSchemas) throws SchemaRegistryException {
        try {
            SchemaKey key1 = new SchemaKey(subject, 1);
            SchemaKey key2 = new SchemaKey(subject, Integer.MAX_VALUE);
            Iterator<SchemaRegistryValue> allVersions = this.kafkaStore.getAll(key1, key2);
            return this.sortSchemasByVersion(allVersions, returnDeletedSchemas).iterator();
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public io.confluent.kafka.schemaregistry.client.rest.entities.Schema getLatestVersion(String subject) throws SchemaRegistryException {
        try {
            SchemaKey key1 = new SchemaKey(subject, 1);
            SchemaKey key2 = new SchemaKey(subject, Integer.MAX_VALUE);
            Iterator<SchemaRegistryValue> allVersions = this.kafkaStore.getAll(key1, key2);
            Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> sortedVersions = this.sortSchemasByVersion(allVersions, false);
            io.confluent.kafka.schemaregistry.client.rest.entities.Schema latestSchema = null;
            if (sortedVersions.size() > 0) {
                latestSchema = sortedVersions.lastElement();
            }
            return latestSchema;
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public void close() {
        log.info("Shutting down schema registry");
        this.kafkaStore.close();
        if (this.masterElector != null) {
            this.masterElector.close();
        }
    }

    public void updateCompatibilityLevel(String subject, AvroCompatibilityLevel newCompatibilityLevel) throws SchemaRegistryStoreException, UnknownMasterException, OperationNotPermittedException {
        if (this.getModeInScope(subject) == Mode.READONLY) {
            throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
        }
        ConfigKey configKey = new ConfigKey(subject);
        try {
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.initTimeout);
            this.kafkaStore.put(configKey, new ConfigValue(newCompatibilityLevel));
            log.debug("Wrote new compatibility level: " + newCompatibilityLevel.name + " to the Kafka data store with key " + configKey.toString());
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to write new config value to the store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateConfigOrForward(String subject, AvroCompatibilityLevel newCompatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, UnknownMasterException, OperationNotPermittedException {
        Object object = this.masterLock;
        synchronized (object) {
            if (this.isMaster()) {
                this.updateCompatibilityLevel(subject, newCompatibilityLevel);
            } else if (this.masterIdentity != null) {
                this.forwardUpdateCompatibilityLevelRequestToMaster(subject, newCompatibilityLevel, headerProperties);
            } else {
                throw new UnknownMasterException("Update config request failed since master is unknown");
            }
        }
    }

    public AvroCompatibilityLevel getCompatibilityLevel(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.compatibilityLevel(subject, false, this.defaultCompatibilityLevel);
    }

    private AvroCompatibilityLevel getCompatibilityLevelInScope(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.compatibilityLevel(subject, true, this.defaultCompatibilityLevel);
    }

    @Override
    public boolean isCompatible(String subject, String newSchemaObj, String latestSchema) throws SchemaRegistryException {
        if (latestSchema == null) {
            throw new InvalidSchemaException("Latest schema not provided");
        }
        return this.isCompatible(subject, newSchemaObj, Collections.singletonList(latestSchema));
    }

    @Override
    public boolean isCompatible(String subject, String newSchemaObj, List<String> previousSchemas) throws SchemaRegistryException {
        if (previousSchemas == null || previousSchemas.isEmpty()) {
            throw new InvalidSchemaException("Previous schema not provided");
        }
        ArrayList<Schema> previousAvroSchemas = new ArrayList<Schema>(previousSchemas.size());
        for (String previousSchema : previousSchemas) {
            if (previousSchema == null) {
                throw new InvalidSchemaException("Existing schema " + previousSchema + " is not a valid Avro schema");
            }
            AvroSchema previousAvroSchema = AvroUtils.parseSchema(previousSchema);
            previousAvroSchemas.add(previousAvroSchema.schemaObj);
        }
        AvroCompatibilityLevel compatibility = this.getCompatibilityLevelInScope(subject);
        return compatibility.compatibilityChecker.isCompatible(AvroUtils.parseSchema((String)newSchemaObj).schemaObj, previousAvroSchemas);
    }

    private void deleteMode(String subject) throws StoreException {
        ModeKey modeKey = new ModeKey(subject);
        this.kafkaStore.delete(modeKey);
    }

    private void deleteSubjectCompatibility(String subject) throws StoreException {
        ConfigKey configKey = new ConfigKey(subject);
        this.kafkaStore.delete(configKey);
    }

    public Mode getMode(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.mode(subject, false, this.defaultMode);
    }

    private Mode getModeInScope(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.mode(subject, true, this.defaultMode);
    }

    public void setMode(String subject, Mode mode) throws SchemaRegistryStoreException, OperationNotPermittedException {
        if (!this.allowModeChanges) {
            throw new OperationNotPermittedException("Mode changes are not allowed");
        }
        ModeKey modeKey = new ModeKey(subject);
        try {
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.initTimeout);
            if (mode == Mode.IMPORT && this.getMode(subject) != Mode.IMPORT) {
                if (this.hasSubjects(subject)) {
                    throw new OperationNotPermittedException("Cannot import since found existing subjects");
                }
                this.kafkaStore.put(new ClearSubjectKey(subject), new ClearSubjectValue(subject));
            }
            this.kafkaStore.put(modeKey, new ModeValue(mode));
            log.debug("Wrote new mode: " + mode.name() + " to the Kafka data store with key " + modeKey.toString());
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to write new mode to the store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setModeOrForward(String subject, Mode mode, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, OperationNotPermittedException, UnknownMasterException {
        Object object = this.masterLock;
        synchronized (object) {
            if (this.isMaster()) {
                this.setMode(subject, mode);
            } else if (this.masterIdentity != null) {
                this.forwardSetModeRequestToMaster(subject, mode, headerProperties);
            } else {
                throw new UnknownMasterException("Update mode request failed since master is unknown");
            }
        }
    }

    KafkaStore<SchemaRegistryKey, SchemaRegistryValue> getKafkaStore() {
        return this.kafkaStore;
    }

    private Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> sortSchemasByVersion(Iterator<SchemaRegistryValue> schemas, boolean returnDeletedSchemas) {
        Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema> schemaVector = new Vector<io.confluent.kafka.schemaregistry.client.rest.entities.Schema>();
        while (schemas.hasNext()) {
            SchemaValue schemaValue = (SchemaValue)schemas.next();
            if (!returnDeletedSchemas && schemaValue.isDeleted()) continue;
            schemaVector.add(this.getSchemaEntityFromSchemaValue(schemaValue));
        }
        Collections.sort(schemaVector);
        return schemaVector;
    }

    private io.confluent.kafka.schemaregistry.client.rest.entities.Schema getSchemaEntityFromSchemaValue(SchemaValue schemaValue) {
        if (schemaValue == null) {
            return null;
        }
        return new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(schemaValue.getSubject(), schemaValue.getVersion(), schemaValue.getId(), schemaValue.getSchema());
    }

    private boolean isSubjectVersionDeleted(String subject, int version) throws SchemaRegistryException {
        try {
            SchemaValue schemaValue = (SchemaValue)this.kafkaStore.get(new SchemaKey(subject, version));
            return schemaValue == null || schemaValue.isDeleted();
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema from the backend Kafka store", e);
        }
    }

    public static class SchemeAndPort {
        public int port;
        public String scheme;

        public SchemeAndPort(String scheme, int port) {
            this.port = port;
            this.scheme = scheme;
        }
    }
}

