/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;

    public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.processor = processor;
        if (endpoint.getBrokers() == null) {
            throw new IllegalArgumentException("BootStrap servers must be specified");
        }
        if (endpoint.getGroupId() == null) {
            throw new IllegalArgumentException("groupId must not be null");
        }
    }

    Properties getProps() {
        Properties props = this.endpoint.getConfiguration().createConsumerProperties();
        this.endpoint.updateClassProperties(props);
        props.put("bootstrap.servers", this.endpoint.getBrokers());
        props.put("group.id", this.endpoint.getGroupId());
        return props;
    }

    protected void doStart() throws Exception {
        super.doStart();
        LOG.info("Starting Kafka consumer");
        this.executor = this.endpoint.createExecutor();
        for (int i = 0; i < this.endpoint.getConsumersCount(); ++i) {
            this.executor.submit(new KafkaFetchRecords(this.endpoint.getTopic(), i + "", this.getProps()));
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        LOG.info("Stopping Kafka consumer");
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
    }

    class KafkaFetchRecords
    implements Runnable {
        private final org.apache.kafka.clients.consumer.KafkaConsumer consumer;
        private final String topicName;
        private final String threadId;
        private final Properties kafkaProps;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        KafkaFetchRecords(String topicName, String id, Properties kafkaProps) {
            this.topicName = topicName;
            this.threadId = topicName + "-" + "Thread " + id;
            this.kafkaProps = kafkaProps;
            ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(null);
                this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
            }
            finally {
                Thread.currentThread().setContextClassLoader(threadClassLoader);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int processed = 0;
            try {
                LOG.debug("Subscribing {} to topic {}", (Object)this.threadId, (Object)this.topicName);
                this.consumer.subscribe(Arrays.asList(this.topicName.split(",")));
                while (KafkaConsumer.this.isRunAllowed() && !KafkaConsumer.this.isSuspendingOrSuspended()) {
                    ConsumerRecords records = this.consumer.poll(Long.MAX_VALUE);
                    for (ConsumerRecord record : records) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("offset = {}, key = {}, value = {}", new Object[]{record.offset(), record.key(), record.value()});
                        }
                        Exchange exchange = KafkaConsumer.this.endpoint.createKafkaExchange(record);
                        try {
                            KafkaConsumer.this.processor.process(exchange);
                        }
                        catch (Exception e) {
                            KafkaConsumer.this.getExceptionHandler().handleException("Error during processing", exchange, (Throwable)e);
                        }
                        if (KafkaConsumer.this.endpoint.isAutoCommitEnable() == null || KafkaConsumer.this.endpoint.isAutoCommitEnable().booleanValue() || ++processed < KafkaConsumer.this.endpoint.getBatchSize()) continue;
                        this.consumer.commitSync();
                        processed = 0;
                    }
                }
                LOG.debug("Unsubscribing {} from topic {}", (Object)this.threadId, (Object)this.topicName);
                this.consumer.unsubscribe();
                LOG.debug("Closing {} ", (Object)this.threadId);
                this.consumer.close();
            }
            catch (InterruptException e) {
                KafkaConsumer.this.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", (Throwable)e);
                this.consumer.unsubscribe();
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                KafkaConsumer.this.getExceptionHandler().handleException("Error consuming " + this.threadId + " from kafka topic", (Throwable)e);
            }
            finally {
                LOG.debug("Closing {} ", (Object)this.threadId);
                this.consumer.close();
            }
        }
    }
}

