/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.KafkaConsumerFatalException;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.camel.component.kafka.TaskHealthState;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.CommitManagers;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
import org.apache.camel.component.kafka.consumer.support.resume.ResumeRebalanceListener;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.task.ForegroundTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.support.task.budget.IterationBudget;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.TimeUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaFetchRecords
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
    private final KafkaConsumer kafkaConsumer;
    private Consumer consumer;
    private volatile String clientId;
    private final String topicName;
    private final Pattern topicPattern;
    private final String threadId;
    private final Properties kafkaProps;
    private PollExceptionStrategy pollExceptionStrategy;
    private final BridgeExceptionHandlerToErrorHandler bridge;
    private final ReentrantLock lock = new ReentrantLock();
    private CommitManager commitManager;
    private volatile Exception lastError;
    private final KafkaConsumerListener consumerListener;
    private volatile boolean terminated;
    private volatile long currentBackoffInterval;
    private volatile boolean reconnect;
    private volatile boolean connected;
    private volatile State state = State.RUNNING;

    KafkaFetchRecords(KafkaConsumer kafkaConsumer, BridgeExceptionHandlerToErrorHandler bridge, String topicName, Pattern topicPattern, String id, Properties kafkaProps, KafkaConsumerListener consumerListener) {
        this.kafkaConsumer = kafkaConsumer;
        this.bridge = bridge;
        this.topicName = topicName;
        this.topicPattern = topicPattern;
        this.consumerListener = consumerListener;
        this.threadId = topicName + "-Thread " + id;
        this.kafkaProps = kafkaProps;
    }

    @Override
    public void run() {
        if (!this.isKafkaConsumerRunnable()) {
            return;
        }
        do {
            this.terminated = false;
            if (!this.isConnected()) {
                this.currentBackoffInterval = this.kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffInterval();
                ForegroundTask task = Tasks.foregroundTask().withName("Create KafkaConsumer").withBudget((IterationBudget)Budgets.iterationBudget().withMaxIterations(this.kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffMaxAttempts()).withInitialDelay(Duration.ZERO).withInterval(Duration.ofMillis(this.currentBackoffInterval)).build()).build();
                boolean success = task.run(this::createConsumerTask);
                if (!success) {
                    int max = this.kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffMaxAttempts();
                    this.setupCreateConsumerException(task, max);
                    this.terminated = true;
                    break;
                }
                this.currentBackoffInterval = this.kafkaConsumer.getEndpoint().getComponent().getSubscribeConsumerBackoffInterval();
                task = Tasks.foregroundTask().withName("Subscribe KafkaConsumer").withBudget((IterationBudget)Budgets.iterationBudget().withMaxIterations(this.kafkaConsumer.getEndpoint().getComponent().getSubscribeConsumerBackoffMaxAttempts()).withInitialDelay(Duration.ZERO).withInterval(Duration.ofMillis(this.currentBackoffInterval)).build()).build();
                success = task.run(this::initializeConsumerTask);
                if (!success) {
                    int max = this.kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffMaxAttempts();
                    this.setupInitializeErrorException(task, max);
                    this.terminated = true;
                    break;
                }
                this.setConnected(true);
            }
            this.setLastError(null);
            this.startPolling();
        } while ((this.pollExceptionStrategy.canContinue() || this.isReconnect()) && this.isKafkaConsumerRunnable());
        if (LOG.isInfoEnabled()) {
            LOG.info("Terminating KafkaConsumer thread {} receiving from {}", (Object)this.threadId, (Object)this.getPrintableTopic());
        }
        this.safeConsumerClose();
    }

    private void setupInitializeErrorException(ForegroundTask task, int max) {
        String time = TimeUtils.printDuration((Duration)task.elapsed(), (boolean)true);
        String topic = this.getPrintableTopic();
        String msg = "Gave up subscribing org.apache.kafka.clients.consumer.KafkaConsumer " + this.threadId + " to " + topic + " after " + max + " attempts (elapsed: " + time + ").";
        LOG.warn(msg);
        this.setLastError(new KafkaConsumerFatalException(msg, this.lastError));
    }

    private void setupCreateConsumerException(ForegroundTask task, int max) {
        String time = TimeUtils.printDuration((Duration)task.elapsed(), (boolean)true);
        String topic = this.getPrintableTopic();
        String msg = "Gave up creating org.apache.kafka.clients.consumer.KafkaConsumer " + this.threadId + " to " + topic + " after " + max + " attempts (elapsed: " + time + ").";
        this.setLastError(new KafkaConsumerFatalException(msg, this.lastError));
    }

    private boolean initializeConsumerTask() {
        try {
            this.initializeConsumer();
        }
        catch (Exception e) {
            this.setConnected(false);
            LOG.warn("Error subscribing org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", (Object)e.getMessage(), (Object)e);
            this.setLastError(e);
            return false;
        }
        return true;
    }

    private boolean createConsumerTask() {
        try {
            this.createConsumer();
            this.commitManager = CommitManagers.createCommitManager(this.consumer, this.kafkaConsumer, this.threadId, this.getPrintableTopic());
            if (this.consumerListener != null) {
                this.consumerListener.setConsumer(this.consumer);
                SeekPolicy seekPolicy = this.kafkaConsumer.getEndpoint().getConfiguration().getSeekTo();
                if (seekPolicy == null && (seekPolicy = this.kafkaConsumer.getEndpoint().getComponent().getConfiguration().getSeekTo()) == null) {
                    seekPolicy = SeekPolicy.BEGINNING;
                }
                this.consumerListener.setSeekPolicy(seekPolicy);
            }
        }
        catch (Exception e) {
            this.setConnected(false);
            LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", (Object)e.getMessage(), (Object)e);
            this.setLastError(e);
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createConsumer() {
        ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
            long delay = this.kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
            String prefix = this.consumer == null ? "Connecting" : "Reconnecting";
            LOG.info("{} Kafka consumer thread ID {} with poll timeout of {} ms", new Object[]{prefix, this.threadId, delay});
            this.consumer = this.kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(this.kafkaProps);
            if (this.clientId == null) {
                this.clientId = this.getKafkaProps().getProperty("client.id");
                if (this.clientId == null) {
                    try {
                        this.clientId = (String)ReflectionHelper.getField((Field)this.consumer.getClass().getDeclaredField("clientId"), (Object)this.consumer);
                    }
                    catch (Exception e) {
                        this.clientId = "";
                    }
                }
            }
            this.pollExceptionStrategy = KafkaErrorStrategies.strategies(this, this.kafkaConsumer.getEndpoint(), this.consumer);
        }
        finally {
            Thread.currentThread().setContextClassLoader(threadClassLoader);
        }
    }

    private void initializeConsumer() {
        this.subscribe();
        this.setConnected(false);
        this.pollExceptionStrategy.reset();
    }

    private void subscribe() {
        Object listener = this.kafkaConsumer.getResumeStrategy() == null ? new ClassicRebalanceListener(this.threadId, this.kafkaConsumer.getEndpoint().getConfiguration(), this.commitManager, this.consumer) : new ResumeRebalanceListener(this.threadId, this.kafkaConsumer.getEndpoint().getConfiguration(), this.commitManager, this.consumer, this.kafkaConsumer.getResumeStrategy());
        if (LOG.isInfoEnabled()) {
            LOG.info("Subscribing {} to {}", (Object)this.threadId, (Object)this.getPrintableTopic());
        }
        if (this.topicPattern != null) {
            this.consumer.subscribe(this.topicPattern, (ConsumerRebalanceListener)listener);
        } else {
            this.consumer.subscribe(Arrays.asList(this.topicName.split(",")), (ConsumerRebalanceListener)listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startPolling() {
        long partitionLastOffset = -1L;
        try {
            this.lock.lock();
            long pollTimeoutMs = this.kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Polling {} from {} with timeout: {}", new Object[]{this.threadId, this.getPrintableTopic(), pollTimeoutMs});
            }
            KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(this.kafkaConsumer, this.threadId, this.commitManager, this.consumerListener);
            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
            ProcessingResult lastResult = null;
            while (this.isKafkaConsumerRunnableAndNotStopped() && this.isConnected() && this.pollExceptionStrategy.canContinue()) {
                ProcessingResult result;
                ConsumerRecords allRecords = this.consumer.poll(pollDuration);
                if (this.consumerListener != null && !this.consumerListener.afterConsume(this.consumer)) continue;
                if (lastResult != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("This polling iteration is using lastresult on partition {} and offset {}", (Object)lastResult.getPartition(), (Object)lastResult.getPartitionLastOffset());
                    }
                } else if (LOG.isTraceEnabled()) {
                    LOG.trace("This polling iteration is using lastresult of null");
                }
                if ((result = recordProcessorFacade.processPolledRecords((ConsumerRecords<Object, Object>)allRecords, lastResult)) != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("This polling iteration had a result returned for partition {} and offset {}", (Object)result.getPartition(), (Object)result.getPartitionLastOffset());
                    }
                } else if (LOG.isTraceEnabled()) {
                    LOG.trace("This polling iteration had a result returned as null");
                }
                this.updateTaskState();
                if (result != null && result.isBreakOnErrorHit() && !this.state.equals((Object)State.PAUSED)) {
                    LOG.debug("We hit an error ... setting flags to force reconnect");
                    this.setReconnect(true);
                    this.setConnected(false);
                    continue;
                }
                lastResult = result;
                if (!LOG.isTraceEnabled()) continue;
                LOG.trace("Setting lastresult to partition {} and offset {}", (Object)lastResult.getPartition(), (Object)lastResult.getPartitionLastOffset());
            }
            if (!this.isConnected()) {
                LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
                this.commitManager.commit();
            }
            this.safeUnsubscribe();
        }
        catch (InterruptException e) {
            this.kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", (Throwable)e);
            this.commitManager.commit();
            LOG.info("Unsubscribing {} from {}", (Object)this.threadId, (Object)this.getPrintableTopic());
            this.safeUnsubscribe();
            Thread.currentThread().interrupt();
        }
        catch (WakeupException e) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("The kafka consumer was woken up while polling on thread {} for {}", (Object)this.threadId, (Object)this.getPrintableTopic());
            }
        }
        catch (Exception e) {
            if (LOG.isDebugEnabled()) {
                LOG.warn("Exception {} caught by thread {} while polling {} from kafka: {}", new Object[]{e.getClass().getName(), this.threadId, this.getPrintableTopic(), e.getMessage(), e});
            } else {
                LOG.warn("Exception {} caught by thread {} while polling {} from kafka: {}", new Object[]{e.getClass().getName(), this.threadId, this.getPrintableTopic(), e.getMessage()});
            }
            this.pollExceptionStrategy.handle(partitionLastOffset, e);
        }
        finally {
            if (!this.pollExceptionStrategy.canContinue()) {
                this.safeUnsubscribe();
                this.safeConsumerClose();
            }
            this.lock.unlock();
        }
    }

    private void updateTaskState() {
        switch (this.state) {
            case PAUSE_REQUESTED: {
                LOG.info("Pausing the consumer as a response to a pause request");
                this.consumer.pause((Collection)this.consumer.assignment());
                this.state = State.PAUSED;
                break;
            }
            case RESUME_REQUESTED: {
                LOG.info("Resuming the consumer as a response to a resume request");
                if (this.consumer.committed(this.consumer.assignment()) != null) {
                    this.consumer.committed(this.consumer.assignment()).forEach((k, v) -> {
                        if (v != null) {
                            TopicPartition tp = (TopicPartition)k;
                            LOG.info("Resuming from the offset {} for the topic {} with partition {}", new Object[]{((OffsetAndMetadata)v).offset(), tp.topic(), tp.partition()});
                            this.consumer.seek(tp, ((OffsetAndMetadata)v).offset());
                        }
                    });
                }
                this.consumer.resume((Collection)this.consumer.assignment());
                this.state = State.RUNNING;
                break;
            }
        }
    }

    private void safeConsumerClose() {
        try {
            LOG.debug("Closing consumer {}", (Object)this.threadId);
            IOHelper.close((Closeable)this.consumer, (String)("Kafka consumer (thread ID " + this.threadId + ")"), (Logger)LOG);
        }
        catch (Exception e) {
            LOG.error("Error closing the Kafka consumer: {} (this error will be ignored)", (Object)e.getMessage(), (Object)e);
        }
    }

    private void safeUnsubscribe() {
        if (this.consumer == null) {
            return;
        }
        String printableTopic = this.getPrintableTopic();
        try {
            LOG.debug("Unsubscribing from Kafka");
            this.consumer.unsubscribe();
            LOG.debug("Done unsubscribing from Kafka");
        }
        catch (IllegalStateException e) {
            LOG.warn("The consumer is likely already closed. Skipping unsubscribing thread {} from kafka {}", (Object)this.threadId, (Object)printableTopic);
        }
        catch (Exception e) {
            LOG.debug("Something went wrong while unsubscribing from Kafka: {}", (Object)e.getMessage());
            this.kafkaConsumer.getExceptionHandler().handleException("Error unsubscribing thread " + this.threadId + " from kafka " + printableTopic, (Throwable)e);
        }
    }

    private String getPrintableTopic() {
        if (this.topicPattern != null) {
            return "topic pattern " + this.topicPattern;
        }
        return "topic " + this.topicName;
    }

    private boolean isKafkaConsumerRunnable() {
        return this.kafkaConsumer.isRunAllowed() && !this.kafkaConsumer.isStoppingOrStopped() && !this.kafkaConsumer.isSuspendingOrSuspended();
    }

    private boolean isKafkaConsumerRunnableAndNotStopped() {
        return this.kafkaConsumer.isRunAllowed() && !this.kafkaConsumer.isStoppingOrStopped();
    }

    private boolean isReconnect() {
        return this.reconnect;
    }

    public void setReconnect(boolean value) {
        this.reconnect = value;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void safeStop() {
        if (this.consumer == null) {
            return;
        }
        long timeout = this.kafkaConsumer.getEndpoint().getConfiguration().getShutdownTimeout();
        try {
            LOG.info("Waiting up to {} milliseconds for the processing to finish", (Object)timeout);
            if (!this.lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
                LOG.warn("The processing of the current record did not finish within {} seconds", (Object)timeout);
            }
            this.consumer.wakeup();
        }
        catch (InterruptedException e) {
            this.consumer.wakeup();
            Thread.currentThread().interrupt();
        }
        finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    void stop() {
        this.safeStop();
    }

    public boolean isConnected() {
        return this.connected;
    }

    public boolean isPaused() {
        return !this.consumer.paused().isEmpty();
    }

    public void setConnected(boolean connected) {
        this.connected = connected;
    }

    private boolean isReady() {
        if (!this.connected) {
            return false;
        }
        boolean ready = true;
        try {
            if (this.consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) {
                org.apache.kafka.clients.consumer.KafkaConsumer kc = (org.apache.kafka.clients.consumer.KafkaConsumer)this.consumer;
                ConsumerNetworkClient nc = (ConsumerNetworkClient)ReflectionHelper.getField((Field)kc.getClass().getDeclaredField("client"), (Object)kc);
                LOG.trace("Health-Check calling org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.hasReadyNode");
                ready = nc.hasReadyNodes(System.currentTimeMillis());
            }
        }
        catch (Exception e) {
            LOG.debug("Cannot check hasReadyNodes on KafkaConsumer client (ConsumerNetworkClient) due to: " + e.getMessage() + ". This exception is ignored.", (Throwable)e);
        }
        return ready;
    }

    private Properties getKafkaProps() {
        return this.kafkaProps;
    }

    private boolean isTerminated() {
        return this.terminated;
    }

    private boolean isRecoverable() {
        return (this.pollExceptionStrategy != null && this.pollExceptionStrategy.canContinue() || this.isReconnect()) && this.isKafkaConsumerRunnable();
    }

    public TaskHealthState healthState() {
        return new TaskHealthState(this.isReady(), this.isTerminated(), this.isRecoverable(), this.lastError, this.clientId, this.currentBackoffInterval, this.kafkaProps);
    }

    public BridgeExceptionHandlerToErrorHandler getBridge() {
        return this.bridge;
    }

    public void pause() {
        LOG.info("A pause request was issued and the consumer thread will pause after current processing has finished");
        this.state = State.PAUSE_REQUESTED;
    }

    public void resume() {
        LOG.info("A resume request was issued and the consumer thread will resume after current processing has finished");
        this.state = State.RESUME_REQUESTED;
    }

    private synchronized void setLastError(Exception lastError) {
        this.lastError = lastError;
    }

    private static enum State {
        RUNNING,
        PAUSE_REQUESTED,
        PAUSED,
        RESUME_REQUESTED;

    }
}

