/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.mongodb;

import com.mongodb.CursorType;
import com.mongodb.MongoCursorNotFoundException;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.mongodb.CamelMongoDbException;
import org.apache.camel.component.mongodb.MongoAbstractConsumerThread;
import org.apache.camel.component.mongodb.MongoDbEndpoint;
import org.apache.camel.component.mongodb.MongoDbTailTrackingManager;
import org.apache.camel.component.mongodb.MongoDbTailableCursorConsumer;
import org.bson.Document;
import org.bson.conversions.Bson;

class MongoDbTailingThread
extends MongoAbstractConsumerThread {
    private static final String CAPPED_KEY = "capped";
    private MongoDbTailTrackingManager tailTracking;

    MongoDbTailingThread(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
        super(endpoint, (Consumer)consumer);
        this.tailTracking = tailTrack;
    }

    @Override
    protected void init() {
        if (this.log.isInfoEnabled()) {
            this.log.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", (Object)String.format("db: %s, col: %s", this.endpoint.getMongoDatabase(), this.endpoint.getCollection()));
        }
        if (Boolean.FALSE.equals(this.isCollectionCapped())) {
            throw new CamelMongoDbException(String.format("Tailable cursors are only compatible with capped collections, and collection %s is not capped", this.endpoint.getCollection()));
        }
        try {
            this.tailTracking.recoverFromStore();
            this.cursor = this.initializeCursor();
        }
        catch (Exception e) {
            throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
        }
        if (this.cursor == null) {
            throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
        }
    }

    private Boolean isCollectionCapped() {
        return this.endpoint.getMongoDatabase().runCommand((Bson)this.createCollStatsCommand()).getBoolean((Object)CAPPED_KEY);
    }

    private Document createCollStatsCommand() {
        return new Document("collStats", (Object)this.endpoint.getCollection());
    }

    @Override
    protected MongoCursor<Document> initializeCursor() {
        MongoCursor iterator;
        Object lastVal = this.tailTracking.lastVal;
        MongoCursor answer = lastVal == null ? this.dbCol.find().cursorType(CursorType.TailableAwait).iterator() : (iterator = this.dbCol.find(Filters.gt((String)this.tailTracking.getIncreasingFieldName(), (Object)lastVal)).cursorType(CursorType.TailableAwait).iterator());
        return answer;
    }

    @Override
    protected void regeneratingCursor() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Regenerating cursor with lastVal: {}, waiting {} ms first", this.tailTracking.lastVal, (Object)this.cursorRegenerationDelay);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doRun() {
        try {
            while (this.cursor.hasNext() && this.keepRunning) {
                Document dbObj = (Document)this.cursor.next();
                Exchange exchange = this.createMongoDbExchange(dbObj);
                try {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("Sending exchange: {}, ObjectId: {}", (Object)exchange, dbObj.get((Object)"_id"));
                    }
                    this.consumer.getProcessor().process(exchange);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.tailTracking.setLastVal(dbObj);
            }
        }
        catch (MongoCursorNotFoundException e) {
            if (this.keepRunning) {
                this.log.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", (Throwable)e);
            }
        }
        catch (IllegalStateException e) {
            this.log.info("Cursor was closed, likely the consumer was stopped and closed the cursor on purpose.", (Throwable)e);
            if (this.cursor != null) {
                this.cursor.close();
            }
            this.keepRunning = false;
        }
        finally {
            this.tailTracking.persistToStore();
        }
    }

    Exchange createMongoDbExchange(Document dbObj) {
        Exchange exchange = this.consumer.createExchange(true);
        Message message = exchange.getIn();
        message.setHeader("CamelMongoDbDatabase", (Object)this.endpoint.getDatabase());
        message.setHeader("CamelMongoDbCollection", (Object)this.endpoint.getCollection());
        message.setHeader("CamelMongoDbTailable", (Object)true);
        message.setBody((Object)dbObj);
        return exchange;
    }
}

