/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbPartition;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.MongoUtils;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor;
import io.debezium.connector.mongodb.events.SplitEventHandler;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
import io.debezium.function.BlockingConsumer;
import io.debezium.function.BlockingRunnable;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbStreamingChangeEventSource
implements StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
    private final MongoDbConnectorConfig connectorConfig;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final MongoDbTaskContext taskContext;
    private final MongoDbStreamingChangeEventSourceMetrics streamingMetrics;
    private final SnapshotterService snapshotterService;
    private MongoDbOffsetContext effectiveOffset;

    public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext, EventDispatcher<MongoDbPartition, CollectionId> dispatcher, ErrorHandler errorHandler, Clock clock, MongoDbStreamingChangeEventSourceMetrics streamingMetrics, SnapshotterService snapshotterService) {
        this.connectorConfig = connectorConfig;
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.taskContext = taskContext;
        this.streamingMetrics = streamingMetrics;
        this.snapshotterService = snapshotterService;
    }

    public void init(MongoDbOffsetContext offsetContext) {
        this.effectiveOffset = offsetContext == null ? this.emptyOffsets(this.connectorConfig) : offsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition, MongoDbOffsetContext offsetContext) {
        if (!this.snapshotterService.getSnapshotter().shouldStream()) {
            LOGGER.info("Streaming is not enabled in configuration");
            return;
        }
        try (MongoDbConnection mongo = this.taskContext.getConnection(this.dispatcher, partition);){
            mongo.execute("Reading change stream", (BlockingConsumer<MongoClient>)((BlockingConsumer)client -> this.readChangeStream((MongoClient)client, context, partition)));
        }
        catch (Throwable t) {
            LOGGER.error("Streaming failed", t);
            this.errorHandler.setProducerThrowable(t);
        }
    }

    public MongoDbOffsetContext getOffsetContext() {
        return this.effectiveOffset;
    }

    private void readChangeStream(MongoClient client, ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition) {
        LOGGER.info("Reading change stream");
        SplitEventHandler splitHandler = new SplitEventHandler();
        ChangeStreamIterable<BsonDocument> stream = this.initChangeStream(client, this.effectiveOffset);
        try (BufferingChangeStreamCursor<BsonDocument> cursor = BufferingChangeStreamCursor.fromIterable(stream, this.taskContext, this.streamingMetrics, this.clock).start();){
            while (context.isRunning()) {
                this.waitWhenStreamingPaused(context);
                Object resumableEvent = cursor.tryNext();
                if (resumableEvent == null) continue;
                StreamStatus result = ((BufferingChangeStreamCursor.ResumableChangeStreamEvent)resumableEvent).document.map(doc -> this.processChangeStreamDocument((ChangeStreamDocument<BsonDocument>)doc, splitHandler, partition, this.effectiveOffset)).orElseGet(() -> this.lambda$readChangeStream$3((BufferingChangeStreamCursor.ResumableChangeStreamEvent)resumableEvent, partition));
                if (result != StreamStatus.ERROR) continue;
                return;
            }
        }
        catch (MongoException e) {
            LOGGER.error("Error while reading change stream", (Throwable)e);
            this.errorHandler.setProducerThrowable((Throwable)e);
        }
    }

    private void waitWhenStreamingPaused(ChangeEventSource.ChangeEventSourceContext context) {
        if (context.isPaused()) {
            this.errorHandled(() -> {
                LOGGER.info("Streaming will now pause");
                context.streamingPaused();
                context.waitSnapshotCompletion();
                LOGGER.info("Streaming resumed");
            });
        }
    }

    private StreamStatus processChangeStreamDocument(ChangeStreamDocument<BsonDocument> document, SplitEventHandler<BsonDocument> splitHandler, MongoDbPartition partition, MongoDbOffsetContext offsetContext) {
        LOGGER.trace("Arrived Change Stream event: {}", document);
        return splitHandler.handle(document).map(event -> this.errorHandled(() -> this.dispatchChangeEvent((ChangeStreamDocument<BsonDocument>)event, partition, offsetContext))).orElse(StreamStatus.NEXT);
    }

    private void dispatchChangeEvent(ChangeStreamDocument<BsonDocument> event, MongoDbPartition partition, MongoDbOffsetContext offsetContext) throws InterruptedException {
        CollectionId collectionId = new CollectionId(event.getNamespace().getDatabaseName(), event.getNamespace().getCollectionName());
        MongoDbChangeRecordEmitter emitter = new MongoDbChangeRecordEmitter(partition, (OffsetContext)offsetContext, this.clock, event, this.connectorConfig);
        offsetContext.changeStreamEvent(event);
        this.dispatcher.dispatchDataChangeEvent((Partition)partition, (DataCollectionId)collectionId, (ChangeRecordEmitter)emitter);
    }

    private void dispatchHeartbeatEvent(BufferingChangeStreamCursor.ResumableChangeStreamEvent<BsonDocument> event, MongoDbPartition partition, MongoDbOffsetContext offsetContext) throws InterruptedException {
        LOGGER.trace("No Change Stream event arrived");
        offsetContext.noEvent(event);
        this.dispatcher.dispatchHeartbeatEvent((Partition)partition, (OffsetContext)offsetContext);
    }

    private StreamStatus errorHandled(BlockingRunnable action) {
        try {
            action.run();
            return StreamStatus.DISPATCHED;
        }
        catch (InterruptedException e) {
            LOGGER.info("Replicator thread is interrupted");
            Thread.currentThread().interrupt();
            return StreamStatus.ERROR;
        }
        catch (Exception e) {
            this.errorHandler.setProducerThrowable((Throwable)e);
            return StreamStatus.ERROR;
        }
    }

    protected ChangeStreamIterable<BsonDocument> initChangeStream(MongoClient client, MongoDbOffsetContext offsetContext) {
        ChangeStreamIterable<BsonDocument> stream = MongoUtils.openChangeStream(client, this.taskContext);
        if (this.connectorConfig.getCaptureMode().isFullUpdate()) {
            if (this.connectorConfig.getCaptureModeFullUpdateType().isPostImage()) {
                stream.fullDocument(FullDocument.WHEN_AVAILABLE);
            } else {
                stream.fullDocument(FullDocument.UPDATE_LOOKUP);
            }
        }
        if (this.connectorConfig.getCaptureMode().isIncludePreImage()) {
            stream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
        }
        if (offsetContext.lastResumeToken() != null) {
            LOGGER.info("Resuming streaming from token '{}'", (Object)offsetContext.lastResumeToken());
            stream.resumeAfter(offsetContext.lastResumeTokenDoc());
        } else if (offsetContext.lastTimestamp() != null) {
            LOGGER.info("Resuming streaming from operation time '{}'", (Object)offsetContext.lastTimestamp());
            stream.startAtOperationTime(offsetContext.lastTimestamp());
        }
        if (this.connectorConfig.getCursorMaxAwaitTime() > 0) {
            stream.maxAwaitTime((long)this.connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
        }
        return stream;
    }

    protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConfig) {
        LOGGER.info("Initializing empty Offset context");
        return MongoDbOffsetContext.empty(connectorConfig);
    }

    private /* synthetic */ StreamStatus lambda$readChangeStream$3(BufferingChangeStreamCursor.ResumableChangeStreamEvent resumableEvent, MongoDbPartition partition) {
        return this.errorHandled(() -> this.dispatchHeartbeatEvent(resumableEvent, partition, this.effectiveOffset));
    }

    protected static enum StreamStatus {
        DISPATCHED,
        NEXT,
        ERROR;

    }
}

