/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.couchdb;

import com.google.gson.JsonObject;
import java.time.Duration;
import org.apache.camel.Exchange;
import org.apache.camel.component.couchdb.CouchDbClientWrapper;
import org.apache.camel.component.couchdb.CouchDbConsumer;
import org.apache.camel.component.couchdb.CouchDbEndpoint;
import org.apache.camel.component.couchdb.consumer.CouchDbResumable;
import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy;
import org.apache.camel.component.couchdb.consumer.CouchDdResumeStrategyFactory;
import org.apache.camel.support.task.BlockingTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.support.task.budget.IterationBudget;
import org.lightcouch.Changes;
import org.lightcouch.ChangesResult;
import org.lightcouch.CouchDbException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CouchDbChangesetTracker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(CouchDbChangesetTracker.class);
    private static final int MAX_DB_ERROR_REPEATS = 8;
    private volatile boolean stopped;
    private final CouchDbClientWrapper couchClient;
    private final CouchDbEndpoint endpoint;
    private final CouchDbConsumer consumer;
    private Changes changes;

    public CouchDbChangesetTracker(CouchDbEndpoint endpoint, CouchDbConsumer consumer, CouchDbClientWrapper couchClient) {
        this.endpoint = endpoint;
        this.consumer = consumer;
        this.couchClient = couchClient;
    }

    private void initChanges(String sequence) {
        CouchDbResumable resumable = new CouchDbResumable(this.couchClient, sequence);
        if (sequence == null) {
            CouchDbResumeStrategy resumeStrategy = CouchDdResumeStrategyFactory.newResumeStrategy(this.endpoint);
            resumeStrategy.resume(resumable);
        }
        LOG.debug("Last sequence [{}]", (Object)resumable.getLastOffset());
        this.changes = this.couchClient.changes().style(this.endpoint.getStyle()).includeDocs(true).since(resumable.getLastOffset()).heartBeat(this.endpoint.getHeartbeat()).continuousChanges();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        String lastSequence = null;
        this.initChanges(null);
        try {
            while (!this.stopped) {
                try {
                    while (this.changes.hasNext()) {
                        ChangesResult.Row feed = this.changes.next();
                        if (feed.isDeleted() && !this.endpoint.isDeletes() || !feed.isDeleted() && !this.endpoint.isUpdates()) continue;
                        lastSequence = feed.getSeq();
                        JsonObject doc = feed.getDoc();
                        Exchange exchange = this.consumer.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Created exchange [exchange={}, _id={}, seq={}", new Object[]{exchange, feed.getId(), lastSequence});
                        }
                        try {
                            this.consumer.getProcessor().process(exchange);
                        }
                        catch (Exception e) {
                            this.consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, (Throwable)e);
                        }
                        finally {
                            this.consumer.releaseExchange(exchange, false);
                        }
                    }
                    this.stopped = true;
                }
                catch (CouchDbException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("CouchDb Exception encountered waiting for changes!  Attempting to recover...", (Throwable)e);
                    }
                    if (this.endpoint.isRunAllowed() || !this.endpoint.isShutdown() || !this.consumer.isStopped()) {
                        if (this.waitForStability(lastSequence)) continue;
                        throw e;
                    }
                    LOG.debug("Skipping the stability check because shutting down or running is not allowed at the moment");
                }
            }
        }
        catch (Exception e) {
            LOG.error("Unexpected error causing CouchDb change tracker to exit!", (Throwable)e);
        }
    }

    private boolean waitForStability(String lastSequence) {
        BlockingTask task = (BlockingTask)Tasks.foregroundTask().withBudget((IterationBudget)Budgets.iterationBudget().withMaxIterations(8).withInterval(Duration.ofSeconds(3L)).build()).withName("couchdb-wait-for-stability").build();
        return task.run(this::stabilityCheck, (Object)lastSequence);
    }

    private boolean stabilityCheck(String lastSequence) {
        try {
            this.couchClient.context().serverVersion();
            this.initChanges(lastSequence);
            return true;
        }
        catch (Exception e) {
            LOG.debug("Failed to get CouchDb server version and/or reset change listener", (Throwable)e);
            return false;
        }
    }

    public void stop() {
        this.changes.stop();
    }
}

