/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.sink;

import com.mongodb.client.MongoClient;
import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
import io.debezium.config.Configuration;
import io.debezium.connector.common.DebeziumTaskState;
import io.debezium.connector.common.UUIDUtils;
import io.debezium.connector.mongodb.connection.MongoDbConnectionContext;
import io.debezium.connector.mongodb.sink.Module;
import io.debezium.connector.mongodb.sink.MongoDbChangeEventSink;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig;
import io.debezium.dlq.ErrorReporter;
import io.debezium.openlineage.ConnectorContext;
import io.debezium.openlineage.DebeziumOpenLineageEmitter;
import io.debezium.openlineage.dataset.DatasetDataExtractor;
import io.debezium.openlineage.dataset.DatasetMetadata;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbSinkConnectorTask
extends SinkTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorTask.class);
    private static final String CONNECTOR_TYPE = "sink";
    private MongoDbChangeEventSink mongoSink;
    private ConnectorContext connectorContext;
    private DatasetDataExtractor datasetDataExtractor;

    public String version() {
        return Module.version();
    }

    public void start(Map<String, String> props) {
        LOGGER.info("Starting MongoDB sink task");
        Configuration config = Configuration.from(props);
        MongoDbSinkConnectorConfig sinkConfig = new MongoDbSinkConnectorConfig(config);
        MongoClient client = null;
        try {
            MongoDbConnectionContext connectionContext = new MongoDbConnectionContext(config);
            client = connectionContext.getMongoClient();
            this.datasetDataExtractor = new DatasetDataExtractor();
            String connectorName = props.get("name");
            String taskId = props.getOrDefault("task.id", "0");
            this.connectorContext = new ConnectorContext(connectorName, Module.name(), taskId, Module.version(), UUIDUtils.generateNewUUID(), this.getMaskedConfigurationMap(props));
            DebeziumOpenLineageEmitter.emit((ConnectorContext)this.connectorContext, (DebeziumTaskState)DebeziumTaskState.INITIAL);
            this.mongoSink = new MongoDbChangeEventSink(sinkConfig, client, this.createErrorReporter(), this.connectorContext);
        }
        catch (RuntimeException taskStartingException) {
            try {
                MongoClient autoCloseableClient = client;
                if (autoCloseableClient != null) {
                    autoCloseableClient.close();
                }
            }
            catch (RuntimeException resourceReleasingException) {
                taskStartingException.addSuppressed(resourceReleasingException);
            }
            throw new ConnectException("Failed to start MongoDB sink task", (Throwable)taskStartingException);
        }
        DebeziumOpenLineageEmitter.emit((ConnectorContext)this.connectorContext, (DebeziumTaskState)DebeziumTaskState.RUNNING);
        LOGGER.debug("Started MongoDB sink task");
    }

    public void put(Collection<SinkRecord> records) {
        try {
            records.forEach(record -> DebeziumOpenLineageEmitter.emit((ConnectorContext)this.connectorContext, (DebeziumTaskState)DebeziumTaskState.RUNNING, List.of(new DatasetMetadata(record.topic(), DatasetMetadata.DatasetKind.INPUT, "STREAM", DatasetMetadata.DataStore.KAFKA, this.datasetDataExtractor.extract((ConnectRecord)record)))));
            this.mongoSink.execute(records);
        }
        catch (Exception e) {
            DebeziumOpenLineageEmitter.emit((ConnectorContext)this.connectorContext, (DebeziumTaskState)DebeziumTaskState.RESTARTING, (Throwable)e);
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        LOGGER.debug("Flush called - noop");
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB sink task");
        if (this.mongoSink != null) {
            this.mongoSink.close();
            DebeziumOpenLineageEmitter.emit((ConnectorContext)this.connectorContext, (DebeziumTaskState)DebeziumTaskState.STOPPED);
            DebeziumOpenLineageEmitter.cleanup((ConnectorContext)this.connectorContext);
        }
    }

    private ErrorReporter createErrorReporter() {
        ErrorReporter result = MongoDbSinkConnectorTask.nopErrorReporter();
        if (this.context != null) {
            try {
                ErrantRecordReporter errantRecordReporter = this.context.errantRecordReporter();
                if (errantRecordReporter != null) {
                    result = (record, e) -> {
                        if (record instanceof KafkaDebeziumSinkRecord) {
                            KafkaDebeziumSinkRecord kafkaRecord = (KafkaDebeziumSinkRecord)record;
                            errantRecordReporter.report(kafkaRecord.getOriginalKafkaRecord(), (Throwable)e);
                        }
                    };
                } else {
                    LOGGER.info("Errant record reporter not configured.");
                }
            }
            catch (NoClassDefFoundError | NoSuchMethodError e2) {
                LOGGER.info("Kafka versions prior to 2.6 do not support the errant record reporter.");
            }
        }
        return result;
    }

    private Map<String, String> getMaskedConfigurationMap(Map<String, String> props) {
        return Configuration.from(props).withMaskedPasswords().asMap();
    }

    static ErrorReporter nopErrorReporter() {
        return (record, e) -> {};
    }
}

