/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch;

import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.util.LoggingContext;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonType;
import org.bson.BsonValue;

public class MongodbFetchTaskContext
implements FetchTask.Context {
    private final MongodbDialect dialect;
    private final MongodbSourceConfig sourceConfig;
    private final ChangeStreamDescriptor changeStreamDescriptor;
    private ChangeEventQueue<DataChangeEvent> changeEventQueue;

    public MongodbFetchTaskContext(MongodbDialect dialect, MongodbSourceConfig sourceConfig, ChangeStreamDescriptor changeStreamDescriptor) {
        this.dialect = dialect;
        this.sourceConfig = sourceConfig;
        this.changeStreamDescriptor = changeStreamDescriptor;
    }

    @Override
    public void configure(@Nonnull SourceSplitBase sourceSplitBase) {
        int queueSize = sourceSplitBase.isSnapshotSplit() ? Integer.MAX_VALUE : this.sourceConfig.getBatchSize();
        this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(this.sourceConfig.getPollAwaitTimeMillis())).maxBatchSize(this.sourceConfig.getPollMaxBatchSize()).maxQueueSize(queueSize).loggingContextSupplier(() -> LoggingContext.forConnector("mongodb-cdc", "mongodb-cdc-connector", "mongodb-cdc-connector-task")).build();
    }

    public MongodbSourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    public MongodbDialect getDialect() {
        return this.dialect;
    }

    public ChangeStreamDescriptor getChangeStreamDescriptor() {
        return this.changeStreamDescriptor;
    }

    @Override
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.changeEventQueue;
    }

    @Override
    public TableId getTableId(SourceRecord record) {
        return MongodbRecordUtils.getTableId(record);
    }

    @Override
    public Tables.TableFilter getTableFilter() {
        return Tables.TableFilter.includeAll();
    }

    @Override
    public boolean isExactlyOnce() {
        return true;
    }

    @Override
    public Offset getStreamOffset(SourceRecord record) {
        return new ChangeStreamOffset(MongodbRecordUtils.getResumeToken(record));
    }

    @Override
    public boolean isDataChangeRecord(SourceRecord record) {
        return MongodbRecordUtils.isDataChangeRecord(record);
    }

    @Override
    public boolean isRecordBetween(SourceRecord record, @Nonnull Object[] splitStart, @Nonnull Object[] splitEnd) {
        BsonValue upperBound;
        BsonDocument documentKey = MongodbRecordUtils.getDocumentKey(record);
        BsonDocument splitKeys = (BsonDocument)splitStart[0];
        String firstKey = splitKeys.getFirstKey();
        BsonValue keyValue = documentKey.get(firstKey);
        BsonValue lowerBound = ((BsonDocument)splitStart[1]).get(firstKey);
        if (this.isFullRange(lowerBound, upperBound = ((BsonDocument)splitEnd[1]).get(firstKey))) {
            return true;
        }
        return this.isValueInRange(lowerBound, keyValue, upperBound);
    }

    private boolean isFullRange(@Nonnull BsonValue lowerBound, BsonValue upperBound) {
        return lowerBound.getBsonType() == BsonType.MIN_KEY && upperBound.getBsonType() == BsonType.MAX_KEY;
    }

    private boolean isValueInRange(BsonValue lowerBound, BsonValue value, BsonValue upperBound) {
        return BsonUtils.compareBsonValue(lowerBound, value) <= 0 && BsonUtils.compareBsonValue(value, upperBound) < 0;
    }

    @Override
    public void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, @Nonnull SourceRecord changeRecord) {
        Struct key = (Struct)changeRecord.key();
        Struct value = (Struct)changeRecord.value();
        if (value != null) {
            String operationType = value.getString("operationType");
            switch (OperationType.fromString(operationType)) {
                case INSERT: {
                    outputBuffer.put(key, changeRecord);
                    break;
                }
                case UPDATE: 
                case REPLACE: {
                    Schema valueSchema = changeRecord.valueSchema();
                    BsonDocument fullDocument = MongodbRecordUtils.extractBsonDocument(value, valueSchema, "fullDocument");
                    if (fullDocument == null) break;
                    BsonDocument valueDocument = this.normalizeSnapshotDocument(fullDocument, value);
                    SourceRecord record = MongodbRecordUtils.buildSourceRecord(changeRecord.sourcePartition(), changeRecord.sourceOffset(), changeRecord.topic(), changeRecord.kafkaPartition(), changeRecord.keySchema(), changeRecord.key(), valueDocument);
                    outputBuffer.put(key, record);
                    break;
                }
                case DELETE: {
                    outputBuffer.remove(key);
                    break;
                }
                default: {
                    throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Data change record meet UNKNOWN operation: " + operationType);
                }
            }
        }
    }

    @Override
    public List<SourceRecord> formatMessageTimestamp(@Nonnull Collection<SourceRecord> snapshotRecords) {
        return snapshotRecords.stream().peek(record -> {
            Struct value = (Struct)record.value();
            Struct source = new Struct(value.schema().field("source").schema());
            source.put("ts_ms", (Object)0L);
            source.put("snapshot", (Object)"true");
            value.put("source", (Object)source);
        }).collect(Collectors.toList());
    }

    private BsonDocument normalizeSnapshotDocument(@Nonnull BsonDocument fullDocument, Struct value) {
        return new BsonDocument().append("_id", new BsonString(value.getString("documentKey"))).append("operationType", new BsonString("insert")).append("ns", new BsonDocument("db", new BsonString(value.getStruct("ns").getString("db"))).append("coll", new BsonString(value.getStruct("ns").getString("coll")))).append("documentKey", new BsonString(value.getString("documentKey"))).append("fullDocument", fullDocument).append("ts_ms", new BsonInt64(value.getInt64("ts_ms"))).append("source", new BsonDocument("snapshot", new BsonString("true")).append("ts_ms", new BsonInt64(0L)));
    }

    @Override
    public void close() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> MongodbUtils.createMongoClient(this.sourceConfig).close()));
    }
}

