/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.txmetadata;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Loggings;
import java.time.Instant;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class TransactionMonitor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionMonitor.class);
    public static final Schema TRANSACTION_BLOCK_SCHEMA = SchemaFactory.get().transactionBlockSchema();
    private final EventMetadataProvider eventMetadataProvider;
    private final String topicName;
    private final BlockingConsumer<SourceRecord> sender;
    private final CommonConnectorConfig connectorConfig;
    private final TransactionStructMaker transactionStructMaker;
    protected final Schema transactionKeySchema;
    protected final String DEBEZIUM_TRANSACTION_ID_KEY = "id";

    public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider, SchemaNameAdjuster schemaNameAdjuster, BlockingConsumer<SourceRecord> sender, String topicName) {
        Objects.requireNonNull(eventMetadataProvider);
        this.transactionStructMaker = connectorConfig.getTransactionMetadataFactory().getTransactionStructMaker();
        this.transactionKeySchema = this.transactionStructMaker.getTransactionKeySchema();
        this.topicName = topicName;
        this.eventMetadataProvider = eventMetadataProvider;
        this.sender = sender;
        this.connectorConfig = connectorConfig;
    }

    public void dataEvent(Partition partition, DataCollectionId source, OffsetContext offset, Object key, Struct value) throws InterruptedException {
        if (!this.connectorConfig.shouldProvideTransactionMetadata()) {
            return;
        }
        TransactionContext transactionContext = offset.getTransactionContext();
        String txId = this.eventMetadataProvider.getTransactionId(source, offset, key, value);
        TransactionInfo transactionInfo = this.eventMetadataProvider.getTransactionInfo(source, offset, key, value);
        if (txId == null) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Event '{}' has no transaction id", (Object)this.eventMetadataProvider.toSummaryString(source, offset, Loggings.maybeRedactSensitiveData(key), value));
            }
            if (transactionContext.isTransactionInProgress()) {
                LOGGER.trace("Transaction was in progress, executing implicit transaction commit");
                this.endTransaction(partition, offset, this.eventMetadataProvider.getEventTimestamp(source, offset, key, value));
            }
            return;
        }
        if (!transactionContext.isTransactionInProgress()) {
            transactionContext.beginTransaction(transactionInfo);
            this.beginTransaction(partition, offset, this.eventMetadataProvider.getEventTimestamp(source, offset, key, value));
        } else if (!transactionContext.getTransactionId().equals(txId)) {
            this.endTransaction(partition, offset, this.eventMetadataProvider.getEventTimestamp(source, offset, key, value));
            transactionContext.endTransaction();
            transactionContext.beginTransaction(transactionInfo);
            this.beginTransaction(partition, offset, this.eventMetadataProvider.getEventTimestamp(source, offset, key, value));
        }
        this.transactionEvent(offset, source, value);
    }

    public void transactionComittedEvent(Partition partition, OffsetContext offset, Instant timestamp) throws InterruptedException {
        if (!this.connectorConfig.shouldProvideTransactionMetadata()) {
            return;
        }
        if (offset.getTransactionContext().isTransactionInProgress()) {
            this.endTransaction(partition, offset, timestamp);
        }
        offset.getTransactionContext().endTransaction();
    }

    public void transactionStartedEvent(Partition partition, TransactionInfo transactionInfo, OffsetContext offset, Instant timestamp) throws InterruptedException {
        if (!this.connectorConfig.shouldProvideTransactionMetadata()) {
            return;
        }
        offset.getTransactionContext().beginTransaction(transactionInfo);
        this.beginTransaction(partition, offset, timestamp);
    }

    protected Struct prepareTxKey(OffsetContext offsetContext) {
        Struct key = this.transactionStructMaker.buildTransactionKey(offsetContext);
        return key;
    }

    protected Struct prepareTxBeginValue(OffsetContext offsetContext, Instant timestamp) {
        Struct value = this.transactionStructMaker.buildBeginTransactionValue(offsetContext, timestamp);
        return value;
    }

    protected Struct prepareTxEndValue(OffsetContext offsetContext, Instant timestamp) {
        Struct value = this.transactionStructMaker.buildEndTransactionValue(offsetContext, timestamp);
        return value;
    }

    protected Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) {
        Struct txStruct = this.transactionStructMaker.addTransactionBlock(offsetContext, dataCollectionEventOrder, value);
        return txStruct;
    }

    private void transactionEvent(OffsetContext offsetContext, DataCollectionId source, Struct value) {
        long dataCollectionEventOrder = offsetContext.getTransactionContext().event(source);
        if (value == null) {
            LOGGER.debug("Event with key {} without value. Cannot enrich source block.");
            return;
        }
        Struct txStruct = this.prepareTxStruct(offsetContext, dataCollectionEventOrder, value);
        value.put("transaction", (Object)txStruct);
    }

    private void beginTransaction(Partition partition, OffsetContext offsetContext, Instant timestamp) throws InterruptedException {
        Struct key = this.prepareTxKey(offsetContext);
        Struct value = this.prepareTxBeginValue(offsetContext, timestamp);
        this.sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), this.topicName, null, key.schema(), (Object)key, value.schema(), (Object)value));
    }

    private void endTransaction(Partition partition, OffsetContext offsetContext, Instant timestamp) throws InterruptedException {
        Struct key = this.prepareTxKey(offsetContext);
        Struct value = this.prepareTxEndValue(offsetContext, timestamp);
        this.sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), this.topicName, null, key.schema(), (Object)key, value.schema(), (Object)value));
    }
}

