/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread;
import io.confluent.kafka.schemaregistry.storage.Store;
import io.confluent.kafka.schemaregistry.storage.StoreUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.log.LogConfig;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterable;
import scala.collection.JavaConversions;
import scala.collection.Map;
import scala.collection.Seq;

public class KafkaStore<K, V>
implements Store<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaStore.class);
    private static final Set<SecurityProtocol> SUPPORTED_SECURITY_PROTOCOLS = new HashSet<SecurityProtocol>(Arrays.asList(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL, SecurityProtocol.SASL_PLAINTEXT, SecurityProtocol.SASL_SSL));
    private final String kafkaClusterZkUrl;
    private final String topic;
    private final int desiredReplicationFactor;
    private final String groupId;
    private final StoreUpdateHandler<K, V> storeUpdateHandler;
    private final Serializer<K, V> serializer;
    private final Store<K, V> localStore;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final int initTimeout;
    private final int timeout;
    private final Seq<Broker> brokerSeq;
    private final String bootstrapBrokers;
    private final ZkUtils zkUtils;
    private KafkaProducer<byte[], byte[]> producer;
    private KafkaStoreReaderThread<K, V> kafkaTopicReader;
    private final K noopKey;
    private volatile long lastWrittenOffset = -1L;
    private final SchemaRegistryConfig config;

    public KafkaStore(SchemaRegistryConfig config, StoreUpdateHandler<K, V> storeUpdateHandler, Serializer<K, V> serializer, Store<K, V> localStore, K noopKey) {
        this.kafkaClusterZkUrl = config.getString("kafkastore.connection.url");
        this.topic = config.getString("kafkastore.topic");
        this.desiredReplicationFactor = config.getInt("kafkastore.topic.replication.factor");
        int port = KafkaSchemaRegistry.getPortForIdentity(config.getInt("port"), config.getList("listeners"));
        this.groupId = String.format("schema-registry-%s-%d", config.getString("host.name"), port);
        this.initTimeout = config.getInt("kafkastore.init.timeout.ms");
        this.timeout = config.getInt("kafkastore.timeout.ms");
        this.storeUpdateHandler = storeUpdateHandler;
        this.serializer = serializer;
        this.localStore = localStore;
        this.noopKey = noopKey;
        this.config = config;
        int zkSessionTimeoutMs = config.getInt("kafkastore.zk.session.timeout.ms");
        this.zkUtils = ZkUtils.apply((String)this.kafkaClusterZkUrl, (int)zkSessionTimeoutMs, (int)zkSessionTimeoutMs, (boolean)KafkaSchemaRegistry.checkZkAclConfig(this.config));
        this.brokerSeq = this.zkUtils.getAllBrokersInCluster();
        List<String> bootstrapServersConfig = config.getList("kafkastore.bootstrap.servers");
        List<String> endpoints = bootstrapServersConfig.isEmpty() ? KafkaStore.brokersToEndpoints(JavaConversions.seqAsJavaList(this.brokerSeq)) : bootstrapServersConfig;
        this.bootstrapBrokers = KafkaStore.endpointsToBootstrapServers(endpoints, config.getString("kafkastore.security.protocol"));
        log.info("Initializing KafkaStore with broker endpoints: " + this.bootstrapBrokers);
    }

    @Override
    public void init() throws StoreInitializationException {
        if (this.initialized.get()) {
            throw new StoreInitializationException("Illegal state while initializing store. Store was already initialized");
        }
        this.createSchemaTopic();
        Properties props = new Properties();
        props.put("bootstrap.servers", this.bootstrapBrokers);
        props.put("acks", "-1");
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        props.put("retries", (Object)0);
        props.put("security.protocol", this.config.getString("kafkastore.security.protocol"));
        KafkaStore.addSecurityConfigsToClientProperties(this.config, props);
        this.producer = new KafkaProducer(props);
        this.kafkaTopicReader = new KafkaStoreReaderThread<K, V>(this.bootstrapBrokers, this.topic, this.groupId, this.storeUpdateHandler, this.serializer, this.localStore, this.noopKey, this.config);
        this.kafkaTopicReader.start();
        try {
            this.waitUntilKafkaReaderReachesLastOffset(this.initTimeout);
        }
        catch (StoreException e) {
            throw new StoreInitializationException(e);
        }
        boolean isInitialized = this.initialized.compareAndSet(false, true);
        if (!isInitialized) {
            throw new StoreInitializationException("Illegal state while initializing store. Store was already initialized");
        }
    }

    public static void addSecurityConfigsToClientProperties(SchemaRegistryConfig config, Properties props) {
        KafkaStore.addSslConfigsToClientProperties(config, props);
        KafkaStore.addSaslConfigsToClientProperties(config, props);
    }

    public static void addSslConfigsToClientProperties(SchemaRegistryConfig config, Properties props) {
        if (config.getString("kafkastore.security.protocol").equals(SecurityProtocol.SSL.toString()) || config.getString("kafkastore.security.protocol").equals(SecurityProtocol.SASL_SSL.toString())) {
            props.put("ssl.truststore.location", config.getString("kafkastore.ssl.truststore.location"));
            props.put("ssl.truststore.password", config.getString("kafkastore.ssl.truststore.password"));
            props.put("ssl.truststore.type", config.getString("kafkastore.ssl.truststore.type"));
            props.put("ssl.trustmanager.algorithm", config.getString("kafkastore.ssl.trustmanager.algorithm"));
            KafkaStore.putIfNotEmptyString("ssl.keystore.location", config.getString("kafkastore.ssl.keystore.location"), props);
            KafkaStore.putIfNotEmptyString("ssl.keystore.password", config.getString("kafkastore.ssl.keystore.password"), props);
            props.put("ssl.keystore.type", config.getString("kafkastore.ssl.keystore.type"));
            props.put("ssl.keymanager.algorithm", config.getString("kafkastore.ssl.keymanager.algorithm"));
            KafkaStore.putIfNotEmptyString("ssl.key.password", config.getString("kafkastore.ssl.key.password"), props);
            KafkaStore.putIfNotEmptyString("ssl.enabled.protocols", config.getString("kafkastore.ssl.enabled.protocols"), props);
            props.put("ssl.protocol", config.getString("kafkastore.ssl.protocol"));
            KafkaStore.putIfNotEmptyString("ssl.provider", config.getString("kafkastore.ssl.provider"), props);
            KafkaStore.putIfNotEmptyString("ssl.cipher.suites", config.getString("kafkastore.ssl.cipher.suites"), props);
            KafkaStore.putIfNotEmptyString("ssl.endpoint.identification.algorithm", config.getString("kafkastore.ssl.endpoint.identification.algorithm"), props);
        }
    }

    public static void addSaslConfigsToClientProperties(SchemaRegistryConfig config, Properties props) {
        if (config.getString("kafkastore.security.protocol").equals(SecurityProtocol.SASL_PLAINTEXT.toString()) || config.getString("kafkastore.security.protocol").equals(SecurityProtocol.SASL_SSL.toString())) {
            KafkaStore.putIfNotEmptyString("sasl.kerberos.service.name", config.getString("kafkastore.sasl.kerberos.service.name"), props);
            props.put("sasl.mechanism", config.getString("kafkastore.sasl.mechanism"));
            props.put("sasl.kerberos.kinit.cmd", config.getString("kafkastore.sasl.kerberos.kinit.cmd"));
            props.put("sasl.kerberos.min.time.before.relogin", (Object)config.getLong("kafkastore.sasl.kerberos.min.time.before.relogin"));
            props.put("sasl.kerberos.ticket.renew.jitter", (Object)config.getDouble("kafkastore.sasl.kerberos.ticket.renew.jitter"));
            props.put("sasl.kerberos.ticket.renew.window.factor", (Object)config.getDouble("kafkastore.sasl.kerberos.ticket.renew.window.factor"));
        }
    }

    private static void putIfNotEmptyString(String parameter, String value, Properties props) {
        if (!value.trim().isEmpty()) {
            props.put(parameter, value);
        }
    }

    private void createSchemaTopic() throws StoreInitializationException {
        if (AdminUtils.topicExists((ZkUtils)this.zkUtils, (String)this.topic)) {
            this.verifySchemaTopic();
            return;
        }
        int numLiveBrokers = this.brokerSeq.size();
        if (numLiveBrokers <= 0) {
            throw new StoreInitializationException("No live Kafka brokers");
        }
        int schemaTopicReplicationFactor = Math.min(numLiveBrokers, this.desiredReplicationFactor);
        if (schemaTopicReplicationFactor < this.desiredReplicationFactor) {
            log.warn("Creating the schema topic " + this.topic + " using a replication factor of " + schemaTopicReplicationFactor + ", which is less than the desired one of " + this.desiredReplicationFactor + ". If this is a production environment, it's " + "crucial to add more brokers and increase the replication factor of the topic.");
        }
        Properties schemaTopicProps = new Properties();
        schemaTopicProps.put(LogConfig.CleanupPolicyProp(), "compact");
        try {
            AdminUtils.createTopic((ZkUtils)this.zkUtils, (String)this.topic, (int)1, (int)schemaTopicReplicationFactor, (Properties)schemaTopicProps, (RackAwareMode)RackAwareMode.Enforced$.MODULE$);
        }
        catch (TopicExistsException topicExistsException) {
            // empty catch block
        }
    }

    static List<String> brokersToEndpoints(List<Broker> brokers) {
        LinkedList<String> endpoints = new LinkedList<String>();
        for (Broker broker : brokers) {
            for (EndPoint ep : JavaConversions.asJavaCollection((Iterable)broker.endPoints())) {
                String hostport = ep.host() == null ? ":" + ep.port() : Utils.formatAddress((String)ep.host(), (Integer)ep.port());
                String endpoint = ep.securityProtocol() + "://" + hostport;
                endpoints.add(endpoint);
            }
        }
        return endpoints;
    }

    static String endpointsToBootstrapServers(List<String> endpoints, String securityProtocol) {
        if (!SUPPORTED_SECURITY_PROTOCOLS.contains(SecurityProtocol.forName((String)securityProtocol))) {
            throw new ConfigException("Only PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL Kafka endpoints are supported.");
        }
        String securityProtocolUrlPrefix = securityProtocol + "://";
        StringBuilder sb = new StringBuilder();
        for (String endpoint : endpoints) {
            if (!endpoint.startsWith(securityProtocolUrlPrefix)) {
                log.warn("Ignoring Kafka broker endpoint " + endpoint + " that does not match the setting for " + "kafkastore.security.protocol" + "=" + securityProtocol);
                continue;
            }
            if (sb.length() > 0) {
                sb.append(",");
            }
            sb.append(endpoint);
        }
        if (sb.length() == 0) {
            throw new ConfigException("No supported Kafka endpoints are configured. Either kafkastore.bootstrap.servers must have at least one endpoint matching kafkastore.security.protocol or broker endpoints loaded from ZooKeeper must have at least one endpoint matching kafkastore.security.protocol.");
        }
        return sb.toString();
    }

    private void verifySchemaTopic() {
        Properties prop;
        String retentionPolicy;
        HashSet<String> topics = new HashSet<String>();
        topics.add(this.topic);
        Map partitionAssignment = (Map)this.zkUtils.getPartitionAssignmentForTopics(JavaConversions.asScalaSet(topics).toSeq()).get((Object)this.topic).get();
        if (partitionAssignment.size() != 1) {
            log.warn("The schema topic " + this.topic + " should have only 1 partition.");
        }
        if (((Seq)partitionAssignment.get((Object)0).get()).size() < this.desiredReplicationFactor) {
            log.warn("The replication factor of the schema topic " + this.topic + " is less than the " + "desired one of " + this.desiredReplicationFactor + ". If this is a production " + "environment, it's crucial to add more brokers and increase the replication " + "factor of the topic.");
        }
        if ((retentionPolicy = (prop = AdminUtils.fetchEntityConfig((ZkUtils)this.zkUtils, (String)ConfigType.Topic(), (String)this.topic)).getProperty(LogConfig.CleanupPolicyProp())) == null || "compact".compareTo(retentionPolicy) != 0) {
            log.error("The retention policy of the schema topic " + this.topic + " is incorrect. " + "You must configure the topic to 'compact' cleanup policy to avoid Kafka deleting your schemas after a week. " + "Refer to Kafka documentation for more details on cleanup policies");
            throw new IllegalStateException("The retention policy of the schema topic " + this.topic + " is incorrect. Expected cleanup.policy to be 'compact' but it is " + retentionPolicy);
        }
    }

    public void waitUntilKafkaReaderReachesLastOffset(int timeoutMs) throws StoreException {
        long offsetOfLastMessage = this.getLatestOffset(timeoutMs);
        log.info("Wait to catch up until the offset of the last message at " + offsetOfLastMessage);
        this.kafkaTopicReader.waitUntilOffset(offsetOfLastMessage, timeoutMs, TimeUnit.MILLISECONDS);
        log.debug("Reached offset at " + offsetOfLastMessage);
    }

    public void markLastWrittenOffsetInvalid() {
        this.lastWrittenOffset = -1L;
    }

    @Override
    public V get(K key) throws StoreException {
        this.assertInitialized();
        return this.localStore.get(key);
    }

    @Override
    public void put(K key, V value) throws StoreTimeoutException, StoreException {
        this.assertInitialized();
        if (key == null) {
            throw new StoreException("Key should not be null");
        }
        ProducerRecord producerRecord = null;
        try {
            producerRecord = new ProducerRecord(this.topic, Integer.valueOf(0), (Object)this.serializer.serializeKey(key), value == null ? null : this.serializer.serializeValue(value));
        }
        catch (SerializationException e) {
            throw new StoreException("Error serializing schema while creating the Kafka produce record", e);
        }
        boolean knownSuccessfulWrite = false;
        try {
            log.trace("Sending record to KafkaStore topic: " + producerRecord);
            Future ack = this.producer.send(producerRecord);
            RecordMetadata recordMetadata = (RecordMetadata)ack.get(this.timeout, TimeUnit.MILLISECONDS);
            log.trace("Waiting for the local store to catch up to offset " + recordMetadata.offset());
            this.lastWrittenOffset = recordMetadata.offset();
            this.kafkaTopicReader.waitUntilOffset(this.lastWrittenOffset, this.timeout, TimeUnit.MILLISECONDS);
            knownSuccessfulWrite = true;
        }
        catch (InterruptedException e) {
            throw new StoreException("Put operation interrupted while waiting for an ack from Kafka", e);
        }
        catch (ExecutionException e) {
            throw new StoreException("Put operation failed while waiting for an ack from Kafka", e);
        }
        catch (TimeoutException e) {
            throw new StoreTimeoutException("Put operation timed out while waiting for an ack from Kafka", e);
        }
        catch (KafkaException ke) {
            throw new StoreException("Put operation to Kafka failed", ke);
        }
        finally {
            if (!knownSuccessfulWrite) {
                this.lastWrittenOffset = -1L;
            }
        }
    }

    @Override
    public Iterator<V> getAll(K key1, K key2) throws StoreException {
        this.assertInitialized();
        return this.localStore.getAll(key1, key2);
    }

    @Override
    public void putAll(java.util.Map<K, V> entries) throws StoreException {
        this.assertInitialized();
        for (Map.Entry<K, V> entry : entries.entrySet()) {
            this.put(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public void delete(K key) throws StoreException {
        this.assertInitialized();
        this.put(key, null);
    }

    @Override
    public Iterator<K> getAllKeys() throws StoreException {
        return this.localStore.getAllKeys();
    }

    @Override
    public void close() {
        this.kafkaTopicReader.shutdown();
        log.debug("Kafka store reader thread shut down");
        this.producer.close();
        log.debug("Kafka store producer shut down");
        this.zkUtils.close();
        log.debug("Kafka store zookeeper client shut down");
        this.localStore.close();
        log.debug("Kafka store shut down complete");
    }

    KafkaStoreReaderThread<K, V> getKafkaStoreReaderThread() {
        return this.kafkaTopicReader;
    }

    private void assertInitialized() throws StoreException {
        if (!this.initialized.get()) {
            throw new StoreException("Illegal state. Store not initialized yet");
        }
    }

    private long getLatestOffset(int timeoutMs) throws StoreException {
        ProducerRecord producerRecord = null;
        if (this.lastWrittenOffset >= 0L) {
            return this.lastWrittenOffset;
        }
        try {
            producerRecord = new ProducerRecord(this.topic, Integer.valueOf(0), (Object)this.serializer.serializeKey(this.noopKey), null);
        }
        catch (SerializationException e) {
            throw new StoreException("Failed to serialize noop key.", e);
        }
        try {
            log.trace("Sending Noop record to KafkaStore to find last offset.");
            Future ack = this.producer.send(producerRecord);
            RecordMetadata metadata = (RecordMetadata)ack.get(timeoutMs, TimeUnit.MILLISECONDS);
            this.lastWrittenOffset = metadata.offset();
            log.trace("Noop record's offset is " + this.lastWrittenOffset);
            return this.lastWrittenOffset;
        }
        catch (Exception e) {
            throw new StoreException("Failed to write Noop record to kafka store.", e);
        }
    }
}

