/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.Store;
import io.confluent.kafka.schemaregistry.storage.StoreUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStoreReaderThread<K, V>
extends ShutdownableThread {
    private static final Logger log = LoggerFactory.getLogger(KafkaStoreReaderThread.class);
    private final String topic;
    private final TopicPartition topicPartition;
    private final String groupId;
    private final StoreUpdateHandler<K, V> storeUpdateHandler;
    private final Serializer<K, V> serializer;
    private final Store<K, V> localStore;
    private final ReentrantLock offsetUpdateLock;
    private final Condition offsetReachedThreshold;
    private Consumer<byte[], byte[]> consumer;
    private long offsetInSchemasTopic = -1L;
    private final K noopKey;
    private Properties consumerProps = new Properties();

    public KafkaStoreReaderThread(String bootstrapBrokers, String topic, String groupId, StoreUpdateHandler<K, V> storeUpdateHandler, Serializer<K, V> serializer, Store<K, V> localStore, K noopKey, SchemaRegistryConfig config) {
        super("kafka-store-reader-thread-" + topic, false);
        this.offsetUpdateLock = new ReentrantLock();
        this.offsetReachedThreshold = this.offsetUpdateLock.newCondition();
        this.topic = topic;
        this.groupId = groupId;
        this.storeUpdateHandler = storeUpdateHandler;
        this.serializer = serializer;
        this.localStore = localStore;
        this.noopKey = noopKey;
        this.consumerProps.put("group.id", this.groupId);
        this.consumerProps.put("client.id", "KafkaStore-reader-" + this.topic);
        this.consumerProps.put("bootstrap.servers", bootstrapBrokers);
        this.consumerProps.put("auto.offset.reset", "earliest");
        this.consumerProps.put("enable.auto.commit", "false");
        this.consumerProps.put("key.deserializer", ByteArrayDeserializer.class);
        this.consumerProps.put("value.deserializer", ByteArrayDeserializer.class);
        this.consumerProps.put("security.protocol", config.getString("kafkastore.security.protocol"));
        KafkaStore.addSecurityConfigsToClientProperties(config, this.consumerProps);
        this.consumer = new KafkaConsumer(this.consumerProps);
        int retries = 0;
        List partitions = null;
        while (retries++ < 10 && ((partitions = this.consumer.partitionsFor(this.topic)) == null || partitions.size() < 1)) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (partitions == null || partitions.size() < 1) {
            throw new IllegalArgumentException("Unable to subscribe to the Kafka topic " + topic + " backing this data store. Topic may not exist.");
        }
        if (partitions.size() > 1) {
            throw new IllegalStateException("Unexpected number of partitions in the " + topic + " topic. Expected 1 and instead got " + partitions.size());
        }
        this.topicPartition = new TopicPartition(topic, 0);
        this.consumer.assign(Arrays.asList(this.topicPartition));
        this.consumer.seekToBeginning(Arrays.asList(this.topicPartition));
        log.info("Initialized last consumed offset to " + this.offsetInSchemasTopic);
        log.debug("Kafka store reader thread started with consumer properties " + this.consumerProps.toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doWork() {
        try {
            ConsumerRecords records = this.consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord record : records) {
                Object messageKey = null;
                try {
                    messageKey = this.serializer.deserializeKey((byte[])record.key());
                }
                catch (SerializationException e) {
                    log.error("Failed to deserialize the schema or config key", (Throwable)e);
                    continue;
                }
                if (messageKey.equals(this.noopKey)) {
                    try {
                        this.offsetUpdateLock.lock();
                        this.offsetInSchemasTopic = record.offset();
                        this.offsetReachedThreshold.signalAll();
                        continue;
                    }
                    finally {
                        this.offsetUpdateLock.unlock();
                        continue;
                    }
                }
                Object message = null;
                try {
                    message = record.value() == null ? null : (Object)this.serializer.deserializeValue(messageKey, (byte[])record.value());
                }
                catch (SerializationException e) {
                    log.error("Failed to deserialize a schema or config update", (Throwable)e);
                    continue;
                }
                try {
                    log.trace("Applying update (" + messageKey + "," + message + ") to the local store");
                    if (message == null) {
                        this.localStore.delete(messageKey);
                    } else {
                        this.localStore.put(messageKey, message);
                    }
                    this.storeUpdateHandler.handleUpdate(messageKey, message);
                    try {
                        this.offsetUpdateLock.lock();
                        this.offsetInSchemasTopic = record.offset();
                        this.offsetReachedThreshold.signalAll();
                    }
                    finally {
                        this.offsetUpdateLock.unlock();
                    }
                }
                catch (StoreException se) {
                    log.error("Failed to add record from the Kafka topic" + this.topic + " the local store");
                }
            }
        }
        catch (WakeupException records) {
        }
        catch (RecordTooLargeException rtle) {
            throw new IllegalStateException("Consumer threw RecordTooLargeException. A schema has been written that exceeds the default maximum fetch size.", rtle);
        }
        catch (RuntimeException e) {
            log.error("KafkaStoreReader thread has died for an unknown reason.");
            throw new RuntimeException(e);
        }
    }

    public void shutdown() {
        log.debug("Starting shutdown of KafkaStoreReaderThread.");
        super.initiateShutdown();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
        if (this.localStore != null) {
            this.localStore.close();
        }
        super.awaitShutdown();
        this.consumer.close();
        log.info("KafkaStoreReaderThread shutdown complete.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitUntilOffset(long offset, long timeout, TimeUnit timeUnit) throws StoreException {
        if (offset < 0L) {
            throw new StoreException("KafkaStoreReaderThread can't wait for a negative offset.");
        }
        log.trace("Waiting to read offset {}. Currently at offset {}", (Object)offset, (Object)this.offsetInSchemasTopic);
        try {
            this.offsetUpdateLock.lock();
            long timeoutNs = TimeUnit.NANOSECONDS.convert(timeout, timeUnit);
            while (this.offsetInSchemasTopic < offset && timeoutNs > 0L) {
                try {
                    timeoutNs = this.offsetReachedThreshold.awaitNanos(timeoutNs);
                }
                catch (InterruptedException e) {
                    log.debug("Interrupted while waiting for the background store reader thread to reach the specified offset: " + offset, (Throwable)e);
                }
            }
        }
        finally {
            this.offsetUpdateLock.unlock();
        }
        if (this.offsetInSchemasTopic < offset) {
            throw new StoreTimeoutException("KafkaStoreReaderThread failed to reach target offset within the timeout interval. targetOffset: " + offset + ", offsetReached: " + this.offsetInSchemasTopic + ", timeout(ms): " + TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
        }
    }

    public String getConsumerProperty(String key) {
        return this.consumerProps.getProperty(key);
    }
}

