/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer.buffered;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.connector.oracle.logminer.TransactionCommitConsumer;
import io.debezium.connector.oracle.logminer.buffered.BufferedLogMinerQueryBuilder;
import io.debezium.connector.oracle.logminer.buffered.CacheProvider;
import io.debezium.connector.oracle.logminer.buffered.LogMinerCache;
import io.debezium.connector.oracle.logminer.buffered.LogMinerTransactionCache;
import io.debezium.connector.oracle.logminer.buffered.Transaction;
import io.debezium.connector.oracle.logminer.buffered.TransactionFactory;
import io.debezium.connector.oracle.logminer.buffered.ehcache.EhcacheCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.ehcache.EhcacheTransactionFactory;
import io.debezium.connector.oracle.logminer.buffered.infinispan.EmbeddedInfinispanCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.infinispan.InfinispanTransactionFactory;
import io.debezium.connector.oracle.logminer.buffered.infinispan.RemoteInfinispanCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.memory.MemoryCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.memory.MemoryTransactionFactory;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent;
import io.debezium.connector.oracle.logminer.events.TruncateEvent;
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.ReadOnlyLogWriterFlushStrategy;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Loggings;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;
import java.math.BigInteger;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedLogMinerStreamingChangeEventSource
extends AbstractLogMinerStreamingChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferedLogMinerStreamingChangeEventSource.class);
    private static final Logger ABANDONED_DETAILS_LOGGER = LoggerFactory.getLogger((String)(BufferedLogMinerStreamingChangeEventSource.class.getName() + ".AbandonedDetails"));
    private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
    private final String queryString;
    private final CacheProvider<Transaction> cacheProvider;
    private final TransactionFactory<Transaction> transactionFactory;
    private Instant lastProcessedScnChangeTime = null;
    private Scn lastProcessedScn = Scn.NULL;

    public BufferedLogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, EventDispatcher<OraclePartition, TableId> dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema, Configuration jdbcConfig, LogMinerStreamingChangeEventSourceMetrics streamingMetrics) {
        super(connectorConfig, jdbcConnection, dispatcher, errorHandler, clock, schema, jdbcConfig, streamingMetrics);
        this.queryString = new BufferedLogMinerQueryBuilder(connectorConfig).getQuery();
        this.cacheProvider = this.createCacheProvider(connectorConfig);
        this.transactionFactory = this.createTransactionFactory(connectorConfig);
    }

    @Override
    protected void executeLogMiningStreaming() throws Exception {
        try (LogWriterFlushStrategy flushStrategy = this.resolveFlushStrategy();){
            boolean sessionStartScnChanged = false;
            Scn sessionStartScn = this.getOffsetContext().getScn();
            Scn sessionEndScn = Scn.NULL;
            Scn readScn = sessionStartScn;
            Stopwatch watch = Stopwatch.accumulating().start();
            int miningStartAttempts = 1;
            this.prepareLogsForMining(false, sessionStartScn);
            while (this.getContext().isRunning()) {
                if (this.getConfig().isArchiveLogOnlyMode() && this.waitForRangeAvailabilityInArchiveLogs(sessionStartScn, sessionEndScn)) {
                    break;
                }
                Instant batchStartTime = Instant.now();
                this.updateDatabaseTimeDifference();
                Scn currentScn = this.getCurrentScn();
                this.getMetrics().setCurrentScn(currentScn);
                sessionEndScn = this.calculateUpperBounds(readScn, sessionEndScn, currentScn);
                if (sessionEndScn.isNull()) {
                    LOGGER.debug("Requested delay of mining by one iteration");
                    this.pauseBetweenMiningSessions();
                    continue;
                }
                flushStrategy.flush(this.getCurrentScn());
                boolean miningSessionRestartRequired = this.isMiningSessionRestartRequired(watch);
                boolean logSwitchOccurred = this.checkLogSwitchOccurredAndUpdate();
                if (miningSessionRestartRequired || logSwitchOccurred || sessionStartScnChanged) {
                    boolean restartRequired;
                    this.endMiningSession();
                    boolean bl = restartRequired = miningSessionRestartRequired || logSwitchOccurred;
                    if (restartRequired && this.getConfig().isLogMiningRestartConnection()) {
                        this.prepareJdbcConnection(true);
                    } else if (!restartRequired) {
                        LOGGER.debug("SCN start position advanced to a new set of logs, restart required.");
                    }
                    this.prepareLogsForMining(true, sessionStartScn);
                    sessionStartScnChanged = false;
                    watch = Stopwatch.accumulating().start();
                }
                if (this.startMiningSession(sessionStartScn, sessionEndScn, miningStartAttempts)) {
                    miningStartAttempts = 1;
                    ProcessResult result = this.process(readScn, sessionEndScn);
                    LOGGER.debug("ProcessResult MineStartScn={} ReadStartScn={}", (Object)result.miningSessionStartScn(), (Object)result.readStartScn());
                    if (!result.miningSessionStartScn.equals(sessionStartScn) && this.hasLogMiningStartingLogChanged(sessionStartScn, result)) {
                        sessionStartScnChanged = true;
                        sessionStartScn = result.miningSessionStartScn;
                    }
                    readScn = result.readStartScn();
                    this.getMetrics().setLastBatchProcessingDuration(Duration.between(batchStartTime, Instant.now()));
                } else {
                    ++miningStartAttempts;
                }
                this.captureJdbcSessionMemoryStatistics();
                this.pauseBetweenMiningSessions();
                if (!this.getContext().isPaused()) continue;
                this.executeBlockingSnapshot();
            }
        }
    }

    public void close() {
        try {
            this.cacheProvider.close();
        }
        catch (Exception e) {
            LOGGER.warn("Failed to gracefully shutdown the cache provider", (Throwable)e);
        }
    }

    private boolean hasLogMiningStartingLogChanged(Scn lastSessionStartScn, ProcessResult lastBatchResult) {
        List<LogFile> logsWithNextMiningStartPosition;
        List<LogFile> logsWithLastMiningStartPosition = this.getCurrentLogFiles().stream().filter(log -> log.isScnInLogFileRange(lastSessionStartScn)).toList();
        return !logsWithLastMiningStartPosition.equals(logsWithNextMiningStartPosition = this.getCurrentLogFiles().stream().filter(log -> log.isScnInLogFileRange(lastBatchResult.miningSessionStartScn)).toList());
    }

    private LogWriterFlushStrategy resolveFlushStrategy() {
        if (this.getConfig().isLogMiningReadOnly()) {
            return new ReadOnlyLogWriterFlushStrategy();
        }
        if (this.getConfig().isRacSystem().booleanValue()) {
            return new RacCommitLogWriterFlushStrategy(this.getConfig(), this.getJdbcConfiguration(), this.getMetrics());
        }
        return new CommitLogWriterFlushStrategy(this.getConfig(), this.getConnection());
    }

    private <T extends Transaction> CacheProvider<T> createCacheProvider(OracleConnectorConfig connectorConfig) {
        return switch (connectorConfig.getLogMiningBufferType()) {
            default -> throw new IncompatibleClassChangeError();
            case OracleConnectorConfig.LogMiningBufferType.MEMORY -> new MemoryCacheProvider(connectorConfig);
            case OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED -> new EmbeddedInfinispanCacheProvider(connectorConfig);
            case OracleConnectorConfig.LogMiningBufferType.INFINISPAN_REMOTE -> new RemoteInfinispanCacheProvider(connectorConfig);
            case OracleConnectorConfig.LogMiningBufferType.EHCACHE -> new EhcacheCacheProvider(connectorConfig);
        };
    }

    private <T extends Transaction> TransactionFactory<T> createTransactionFactory(OracleConnectorConfig connectorConfig) {
        return switch (connectorConfig.getLogMiningBufferType()) {
            default -> throw new IncompatibleClassChangeError();
            case OracleConnectorConfig.LogMiningBufferType.MEMORY -> new MemoryTransactionFactory();
            case OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED, OracleConnectorConfig.LogMiningBufferType.INFINISPAN_REMOTE -> new InfinispanTransactionFactory();
            case OracleConnectorConfig.LogMiningBufferType.EHCACHE -> new EhcacheTransactionFactory();
        };
    }

    protected ProcessResult process(Scn startScn, Scn endScn) throws SQLException, InterruptedException {
        this.getBatchMetrics().reset();
        try (PreparedStatement statement = this.createQueryStatement();){
            LOGGER.debug("Fetching results for SCN [{}, {}]", (Object)startScn, (Object)endScn);
            statement.setFetchSize(this.getConfig().getQueryFetchSize());
            statement.setFetchDirection(1000);
            statement.setString(1, startScn.toString());
            statement.setString(2, endScn.toString());
            if (this.getConfig().isLogMiningUseCteQuery()) {
                statement.setString(3, startScn.toString());
                statement.setString(4, endScn.toString());
            }
            this.getMetrics().setLastMiningFetchRange(startScn, endScn);
            this.executeAndProcessQuery(statement);
            this.logActiveTransactions();
            ProcessResult processResult = this.calculateNewStartScn(startScn, endScn, this.getOffsetContext().getCommitScn().getMaxCommittedScn());
            return processResult;
        }
    }

    protected LogMinerTransactionCache<Transaction> getTransactionCache() {
        return this.cacheProvider.getTransactionCache();
    }

    protected LogMinerCache<String, String> getProcessedTransactionsCache() {
        return this.cacheProvider.getProcessedTransactionsCache();
    }

    protected LogMinerCache<String, String> getSchemaChangesCache() {
        return this.cacheProvider.getSchemaChangesCache();
    }

    private boolean isRecentlyProcessed(String transactionId) {
        return this.getProcessedTransactionsCache().containsKey(transactionId);
    }

    private boolean hasSchemaChangeBeenSeen(LogMinerEventRow event) {
        return this.getSchemaChangesCache().containsKey(event.getScn().toString());
    }

    private int getTransactionEventCount(Transaction transaction) {
        return this.getTransactionCache().getTransactionEventCount(transaction);
    }

    protected PreparedStatement createQueryStatement() throws SQLException {
        PreparedStatement statement = this.getConnection().connection().prepareStatement(this.queryString, 1003, 1007, 1);
        statement.setQueryTimeout((int)this.getConnection().config().getQueryTimeout().toSeconds());
        return statement;
    }

    @Override
    protected void preProcessEvent(LogMinerEventRow event) {
        super.preProcessEvent(event);
        if (!EventType.MISSING_SCN.equals((Object)event.getEventType())) {
            this.lastProcessedScn = event.getScn();
            this.lastProcessedScnChangeTime = event.getChangeTime();
        }
    }

    @Override
    protected boolean isEventSkipped(LogMinerEventRow event) {
        Transaction transaction;
        if (event.getTableId() != null) {
            if (LogWriterFlushStrategy.isFlushTable(event.getTableId(), this.getConfig().getJdbcConfig().getUser(), this.getConfig().getLogMiningFlushTableName())) {
                LOGGER.trace("Skipped change associated with flush table '{}'", (Object)event.getTableId());
                return true;
            }
            if (this.isNonSchemaChangeEventSkipped(event)) {
                return true;
            }
        }
        if ((transaction = this.getTransactionCache().getTransaction(event.getTransactionId())) != null && this.isTransactionOverEventThreshold(transaction)) {
            this.abandonTransactionOverEventThreshold(transaction);
            return true;
        }
        return false;
    }

    @Override
    protected void handleStartEvent(LogMinerEventRow event) {
        String transactionId = event.getTransactionId();
        if (!this.isRecentlyProcessed(transactionId)) {
            Transaction transaction = this.getTransactionCache().getTransaction(transactionId);
            if (transaction == null) {
                this.getTransactionCache().addTransaction(this.transactionFactory.createTransaction(event));
                this.getMetrics().setActiveTransactionCount(this.getTransactionCache().getTransactionCount());
            } else {
                LOGGER.trace("Transaction {} is not yet committed and START event detected.", (Object)transactionId);
                this.getTransactionCache().resetTransactionToStart(transaction);
            }
        }
    }

    @Override
    protected void handleCommitEvent(LogMinerEventRow row) throws InterruptedException {
        String transactionId = row.getTransactionId();
        if (this.isRecentlyProcessed(transactionId)) {
            LOGGER.debug("\tTransaction is already committed, skipped.");
            return;
        }
        Transaction transaction = this.getTransactionCache().getAndRemoveTransaction(transactionId);
        if (transaction == null) {
            if (!this.getOffsetContext().getCommitScn().hasEventScnBeenHandled(row)) {
                LOGGER.debug("Transaction {} not found in cache with SCN {}, no events to commit.", (Object)transactionId, (Object)row.getScn());
            }
            this.getTransactionCache().removeAbandonedTransaction(row.getTransactionId());
        }
        Scn smallestScn = this.calculateSmallestScn();
        Scn commitScn = row.getScn();
        if (this.getOffsetContext().getCommitScn().hasEventScnBeenHandled(row)) {
            if (transaction != null) {
                if (transaction.getNumberOfEvents() > 0) {
                    Scn lastCommittedScn = this.getOffsetContext().getCommitScn().getCommitScnForRedoThread(row.getThread());
                    LOGGER.debug("Transaction {} has already been processed. Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.", new Object[]{transactionId, this.getOffsetContext().getCommitScn(), commitScn, lastCommittedScn});
                }
                this.cleanupAfterTransactionRemovedFromCache(transaction, false);
                this.getMetrics().setActiveTransactionCount(this.getTransactionCache().getTransactionCount());
                this.getMetrics().setBufferedEventCount(this.getTransactionCache().getTransactionEvents());
            }
            return;
        }
        int numEvents = transaction == null ? 0 : this.getTransactionEventCount(transaction);
        boolean skipCommit = row.getThread() == 0 && numEvents == 0;
        LOGGER.debug("{} transaction {} with {} events (scn: {}, thread: {}, oldest buffer scn: {}): {}", new Object[]{skipCommit ? "Skipping commit for" : "Committing", transactionId, numEvents, row.getScn(), row.getThread(), smallestScn, row});
        if (skipCommit) {
            if (transaction != null) {
                this.cleanupAfterTransactionRemovedFromCache(transaction, false);
            }
            return;
        }
        Instant start = Instant.now();
        boolean dispatchTransactionCommittedEvent = false;
        if (numEvents > 0) {
            boolean skipEvents = this.isTransactionSkippedAtCommit(transaction);
            dispatchTransactionCommittedEvent = !skipEvents;
            ZoneOffset databaseOffset = this.getMetrics().getDatabaseOffset();
            TransactionCommitConsumer.Handler<LogMinerEvent> delegate = (event, eventIndex, eventsProcessed) -> {
                if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
                    this.getMetrics().setOldestScnDetails(event.getScn(), event.getChangeTime());
                }
                if (Objects.equals(this.getOffsetContext().getTransactionId(), transactionId) && this.getOffsetContext().getTransactionSequence() != null && this.getOffsetContext().getTransactionSequence() >= eventIndex) {
                    LOGGER.info("Skipping event {} in transaction {} - has already been sent.", (Object)eventIndex, (Object)transactionId);
                    Loggings.logDebugAndTraceRecord((Logger)LOGGER, (Object)event, (String)"Skipping event {} in transaction {} - has already been sent.", (Object[])new Object[]{eventIndex, transactionId});
                    return;
                }
                this.getOffsetContext().setEventScn(event.getScn());
                this.getOffsetContext().setEventCommitScn(row.getScn());
                this.getOffsetContext().setTransactionId(transactionId);
                this.getOffsetContext().setTransactionSequence(eventIndex);
                this.getOffsetContext().setUserName(transaction.getUserName());
                this.getOffsetContext().setSourceTime(event.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
                this.getOffsetContext().setTableId(event.getTableId());
                this.getOffsetContext().setRedoThread(row.getThread());
                this.getOffsetContext().setRsId(event.getRsId());
                this.getOffsetContext().setRowId(event.getRowId());
                this.getOffsetContext().setCommitTime(row.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
                if (eventIndex == 1L) {
                    this.getOffsetContext().setStartScn(event.getScn());
                    this.getOffsetContext().setStartTime(event.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
                }
                if (event instanceof RedoSqlDmlEvent) {
                    this.getOffsetContext().setRedoSql(((RedoSqlDmlEvent)event).getRedoSql());
                }
                DmlEvent dmlEvent = (DmlEvent)event;
                if (!skipEvents) {
                    LogMinerChangeRecordEmitter logMinerChangeRecordEmitter = dmlEvent instanceof TruncateEvent ? new LogMinerChangeRecordEmitter(this.getConfig(), (Partition)this.getPartition(), (OffsetContext)this.getOffsetContext(), Envelope.Operation.TRUNCATE, dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), this.getSchema().tableFor(event.getTableId()), this.getSchema(), Clock.system()) : new LogMinerChangeRecordEmitter(this.getConfig(), (Partition)this.getPartition(), (OffsetContext)this.getOffsetContext(), dmlEvent.getEventType(), dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), this.getSchema().tableFor(event.getTableId()), this.getSchema(), Clock.system());
                    this.getEventDispatcher().dispatchDataChangeEvent((Partition)this.getPartition(), (DataCollectionId)event.getTableId(), (ChangeRecordEmitter)logMinerChangeRecordEmitter);
                }
                this.getOffsetContext().setRedoSql(null);
            };
            try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, this.getConfig(), this.getSchema());){
                this.getTransactionCache().forEachEvent(transaction, event -> {
                    if (!this.getContext().isRunning()) {
                        return false;
                    }
                    LOGGER.trace("Dispatching event {}", (Object)event.getEventType());
                    commitConsumer.accept((LogMinerEvent)event);
                    return true;
                });
            }
            if (!this.getContext().isRunning()) {
                return;
            }
        }
        this.getOffsetContext().getCommitScn().recordCommit(row);
        this.getOffsetContext().setEventScn(commitScn);
        this.getOffsetContext().setRsId(row.getRsId());
        this.getOffsetContext().setRowId("");
        this.getOffsetContext().setStartScn(Scn.NULL);
        this.getOffsetContext().setCommitTime(null);
        this.getOffsetContext().setStartTime(null);
        if (dispatchTransactionCommittedEvent) {
            this.getEventDispatcher().dispatchTransactionCommittedEvent((Partition)this.getPartition(), (OffsetContext)this.getOffsetContext(), transaction.getChangeTime());
        } else {
            this.getEventDispatcher().dispatchHeartbeatEvent((Partition)this.getPartition(), (OffsetContext)this.getOffsetContext());
        }
        this.getBatchMetrics().commitObserved();
        if (transaction != null) {
            this.finalizeTransaction(transactionId, commitScn, false);
            this.cleanupAfterTransactionRemovedFromCache(transaction, false);
            this.getMetrics().calculateLagFromSource(row.getChangeTime());
            this.getMetrics().setActiveTransactionCount(this.getTransactionCache().getTransactionCount());
            this.getMetrics().setBufferedEventCount(this.getTransactionCache().getTransactionEvents());
        }
        this.updateCommitMetrics(row, Duration.between(start, Instant.now()), numEvents);
    }

    @Override
    protected void handleRollbackEvent(LogMinerEventRow event) {
        String transactionId = event.getTransactionId();
        if (this.getTransactionCache().containsTransaction(transactionId)) {
            LOGGER.debug("Transaction {} was rolled back.", (Object)transactionId);
            this.finalizeTransaction(transactionId, event.getScn(), true);
            this.getMetrics().setActiveTransactionCount(this.getTransactionCache().getTransactionCount());
            this.getMetrics().setBufferedEventCount(this.getTransactionCache().getTransactionEvents());
        } else {
            LOGGER.debug("Transaction {} not found in cache, no events to rollback.", (Object)transactionId);
            this.getTransactionCache().removeAbandonedTransaction(transactionId);
        }
        this.getMetrics().incrementRolledBackTransactionCount();
        this.getMetrics().addRolledBackTransactionId(transactionId);
        this.getBatchMetrics().rollbackObserved();
    }

    @Override
    protected void handleSchemaChangeEvent(LogMinerEventRow event) throws InterruptedException {
        if (this.isSchemaChangeEventSkipped(event)) {
            return;
        }
        if (this.hasSchemaChangeBeenSeen(event)) {
            LOGGER.trace("DDL: Scn {}, SQL '{}' has already been processed, skipped.", (Object)event.getScn(), (Object)event.getRedoSql());
            return;
        }
        if (!Strings.isNullOrEmpty((String)event.getTableName())) {
            Loggings.logDebugAndTraceRecord((Logger)LOGGER, (Object)event, (String)"Processing DDL event with SCN {}: {}", (Object[])new Object[]{event.getScn(), event.getRedoSql()});
            if (this.canAdvanceLowerScnBoundaryOnSchemaChange(event)) {
                LOGGER.debug("Schema change advanced offset SCN to {}", (Object)event.getScn());
                this.getOffsetContext().setScn(event.getScn());
            }
            LOGGER.debug("Schema change advanced offset commit SCN to {} for thread {}", (Object)event.getScn(), (Object)event.getThread());
            this.getOffsetContext().getCommitScn().recordCommit(event);
            if (this.getConfig().isLobEnabled()) {
                this.getSchemaChangesCache().put(event.getScn().toString(), event.getTableId().identifier());
            }
            this.dispatchSchemaChangeEventInternal(event);
        }
    }

    @Override
    protected void handleTruncateEvent(LogMinerEventRow event) throws InterruptedException {
        try {
            Table table = this.getTableForDataEvent(event);
            if (table != null) {
                LOGGER.debug("Dispatching TRUNCATE event for table '{}' with SCN {}", (Object)table.id(), (Object)event.getScn());
                this.enqueueEvent(event, new TruncateEvent(event, this.parseTruncateEvent(event)));
            }
        }
        catch (SQLException e) {
            LOGGER.warn("Failed to process truncate event", (Throwable)e);
            this.getMetrics().incrementWarningCount();
        }
    }

    @Override
    protected boolean isDispatchAllowedForDataChangeEvent(LogMinerEventRow event) {
        if (event.isRollbackFlag()) {
            this.removeEventWithRowId(event);
            return false;
        }
        return true;
    }

    @Override
    protected void handleReplicationMarkerEvent(LogMinerEventRow event) {
        String transactionId = event.getTransactionId();
        Transaction transaction = this.getTransactionCache().getTransaction(transactionId);
        if (transaction != null) {
            LOGGER.debug("Skipping GoldenGate replication marker for transaction {} with SCN {}", (Object)transactionId, (Object)event.getScn());
            this.getTransactionCache().removeTransactionEvents(transaction);
            this.getTransactionCache().removeTransaction(transaction);
        }
        this.getTransactionCache().removeAbandonedTransaction(transactionId);
    }

    @Override
    protected boolean isNoDataProcessedInBatchAndAtEndOfArchiveLogs() {
        return !this.getMetrics().getBatchMetrics().hasJdbcRows();
    }

    @Override
    protected List<String> getActiveTransactionIds() {
        return this.getTransactionCache().streamTransactionsAndReturn(stream -> stream.map(Transaction::getTransactionId).toList());
    }

    private ProcessResult calculateNewStartScn(Scn startScn, Scn endScn, Scn maxCommittedScn) throws InterruptedException {
        Instant minCacheScnChangeTime;
        Scn minCacheScn;
        if (!this.getBatchMetrics().hasJdbcRows()) {
            this.getEventDispatcher().dispatchHeartbeatEvent((Partition)this.getPartition(), (OffsetContext)this.getOffsetContext());
            return new ProcessResult(this.getLogMinerContext().getCurrentSessionStartScn(), startScn);
        }
        this.abandonTransactions(this.getConfig().getLogMiningTransactionRetention());
        Optional<LogMinerTransactionCache.ScnDetails> eldestScnDetails = this.getTransactionCache().getEldestTransactionScnDetailsInCache();
        if (eldestScnDetails.isPresent()) {
            minCacheScn = eldestScnDetails.get().scn();
            minCacheScnChangeTime = eldestScnDetails.get().changeTime();
        } else {
            minCacheScn = Scn.NULL;
            minCacheScnChangeTime = null;
        }
        if (!minCacheScn.isNull()) {
            this.getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf((String)entry.getValue()).compareTo(minCacheScn) < 0);
            this.getSchemaChangesCache().removeIf(entry -> Scn.valueOf((String)entry.getKey()).compareTo(minCacheScn) < 0);
        } else {
            this.getSchemaChangesCache().removeIf(e -> true);
        }
        if (!this.lastProcessedScn.isNull() && this.lastProcessedScn.compareTo(endScn) < 0) {
            endScn = this.lastProcessedScn;
        }
        this.getMetrics().setOldestScnDetails(minCacheScn, minCacheScnChangeTime);
        if (!minCacheScn.isNull()) {
            Scn miningSessionStartScn = minCacheScn.subtract(Scn.ONE);
            this.getOffsetContext().setScn(miningSessionStartScn);
            this.getEventDispatcher().dispatchHeartbeatEvent((Partition)this.getPartition(), (OffsetContext)this.getOffsetContext());
            this.getMetrics().setOffsetScn(this.getOffsetContext().getScn());
            return new ProcessResult(miningSessionStartScn, this.getConfig().isLobEnabled() ? miningSessionStartScn : endScn);
        }
        Scn miningSessionStartScn = endScn.subtract(Scn.ONE);
        if (!maxCommittedScn.isNull()) {
            this.getOffsetContext().setScn(maxCommittedScn);
            this.getEventDispatcher().dispatchHeartbeatEvent((Partition)this.getPartition(), (OffsetContext)this.getOffsetContext());
        }
        this.getMetrics().setOffsetScn(this.getOffsetContext().getScn());
        return new ProcessResult(miningSessionStartScn, endScn);
    }

    private Scn calculateSmallestScn() {
        return this.getTransactionCache().getEldestTransactionScnDetailsInCache().map(scnDetails -> {
            this.getMetrics().setOldestScnDetails(scnDetails.scn(), scnDetails.changeTime());
            return scnDetails.scn();
        }).orElseGet(() -> {
            this.getMetrics().setOldestScnDetails(Scn.valueOf(-1), null);
            return Scn.NULL;
        });
    }

    private void removeEventWithRowId(LogMinerEventRow row) {
        Transaction transaction = this.getTransactionCache().getTransaction(row.getTransactionId());
        if (transaction != null) {
            if (this.removeTransactionEventWithRowId(transaction, row)) {
                return;
            }
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot apply undo change in transaction '{}' with SCN '{}' on table '{}' since event with row-id {} was not found.", (Object[])new Object[]{row.getTransactionId(), row.getScn(), row.getTableId(), row.getRowId()});
        } else if (row.getTransactionId().endsWith(NO_SEQUENCE_TRX_ID_SUFFIX)) {
            String prefix = row.getTransactionId().substring(0, 8);
            LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", (Object)row.getTransactionId());
            LOGGER.debug("Checking all transactions with prefix '{}'", (Object)prefix);
            if (this.getTransactionCache().streamTransactionsAndReturn(stream -> stream.filter(t -> t.getTransactionId().startsWith(prefix)).anyMatch(t -> this.removeTransactionEventWithRowId((Transaction)t, row))).booleanValue()) {
                return;
            }
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot apply undo change in transaction '{}' with SCN '{}' on table '{}' since event with row-id {} was not found.", (Object[])new Object[]{row.getTransactionId(), row.getScn(), row.getTableId(), row.getRowId()});
        } else if (!this.getConfig().isLobEnabled()) {
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot apply undo change with SCN '{}' on table '{}' since transaction '{}' was not found.", (Object[])new Object[]{row.getScn(), row.getTableId(), row.getTransactionId()});
        } else {
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Failed to apply undo change with SCN '{}' on table '{}' in transaction '{}' with row-id '{}'", (Object[])new Object[]{row.getScn(), row.getTableId(), row.getTransactionId(), row.getRowId()});
        }
    }

    private boolean removeTransactionEventWithRowId(Transaction transaction, LogMinerEventRow row) {
        if (this.getTransactionCache().removeTransactionEventWithRowId(transaction, row.getRowId())) {
            this.getMetrics().increasePartialRollbackCount();
            this.getBatchMetrics().partialRollbackObserved();
            Loggings.logDebugAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Undo change on table '{}' applied to transaction event with row-id '{}'", (Object[])new Object[]{row.getTableId(), row.getRowId()});
            return true;
        }
        return false;
    }

    private void cleanupAfterTransactionRemovedFromCache(Transaction transaction, boolean isAbandoned) {
        if (isAbandoned) {
            this.getTransactionCache().abandon(transaction);
        } else {
            this.getTransactionCache().removeAbandonedTransaction(transaction.getTransactionId());
        }
        this.getTransactionCache().removeTransactionEvents(transaction);
    }

    @Override
    protected boolean hasEventBeenProcessed(LogMinerEventRow event) {
        String transactionId = event.getTransactionId();
        if (this.isRecentlyProcessed(transactionId)) {
            LOGGER.debug("Transaction {} has been seen by connector, skipped.", (Object)transactionId);
            return true;
        }
        if (this.getTransactionCache().isAbandoned(transactionId)) {
            LOGGER.debug("Event for abandoned transaction {}, skipped.", (Object)transactionId);
            return true;
        }
        return this.isEventIncludedInSnapshot(event);
    }

    private boolean isTransactionSkippedAtCommit(Transaction transaction) {
        return transaction != null && (this.isUserNameSkipped(transaction.getUserName()) || this.isClientIdSkipped(transaction.getClientId()));
    }

    private void finalizeTransaction(String transactionId, Scn eventScn, boolean rollbackEvent) {
        Transaction transaction;
        if (rollbackEvent && (transaction = this.getTransactionCache().getTransaction(transactionId)) != null) {
            this.getTransactionCache().removeTransactionEvents(transaction);
            this.getTransactionCache().removeTransaction(transaction);
        }
        this.getTransactionCache().removeAbandonedTransaction(transactionId);
        if (this.getConfig().isLobEnabled()) {
            this.getProcessedTransactionsCache().put(transactionId, eventScn.toString());
        }
    }

    private boolean canAdvanceLowerScnBoundaryOnSchemaChange(LogMinerEventRow row) {
        int cacheSize = this.getTransactionCache().getTransactionCount();
        if (cacheSize == 0) {
            return true;
        }
        if (cacheSize == 1) {
            return this.getTransactionCache().streamTransactionsAndReturn(stream -> stream.map(Transaction::getTransactionId).allMatch(trxId -> trxId.equals(row.getTransactionId())));
        }
        return false;
    }

    @Override
    protected void enqueueEvent(LogMinerEventRow event, LogMinerEvent dispatchedEvent) throws InterruptedException {
        String transactionId = event.getTransactionId();
        Transaction transaction = this.getTransactionCache().getTransaction(transactionId);
        if (transaction == null) {
            LOGGER.trace("Transaction {} is not in cache, creating.", (Object)transactionId);
            transaction = this.transactionFactory.createTransaction(event);
            this.getTransactionCache().addTransaction(transaction);
            this.getMetrics().setActiveTransactionCount(this.getTransactionCache().getTransactionCount());
        }
        int eventId = transaction.getNextEventId();
        if (!this.getTransactionCache().containsTransactionEvent(transaction, eventId)) {
            LOGGER.trace("Transaction {}, adding event reference at key {}", (Object)transactionId, (Object)transaction.getEventId(eventId));
            this.getTransactionCache().addTransactionEvent(transaction, eventId, dispatchedEvent);
            this.getMetrics().calculateLagFromSource(event.getChangeTime());
            this.getMetrics().setBufferedEventCount(this.getTransactionCache().getTransactionEvents());
        }
        this.getTransactionCache().syncTransaction(transaction);
    }

    private boolean isTransactionOverEventThreshold(Transaction transaction) {
        if (this.getConfig().getLogMiningBufferTransactionEventsThreshold() <= 0L) {
            return false;
        }
        return (long)this.getTransactionEventCount(transaction) >= this.getConfig().getLogMiningBufferTransactionEventsThreshold();
    }

    private void abandonTransactionOverEventThreshold(Transaction transaction) {
        LOGGER.warn("Transaction {} exceeds maximum allowed number of events, transaction will be abandoned.", (Object)transaction.getTransactionId());
        this.getMetrics().incrementWarningCount();
        this.getTransactionCache().getAndRemoveTransaction(transaction.getTransactionId());
        this.cleanupAfterTransactionRemovedFromCache(transaction, true);
        this.getMetrics().incrementOversizedTransactionCount();
    }

    protected void abandonTransactions(Duration retention) throws InterruptedException {
        if (!Duration.ZERO.equals(retention)) {
            Scn smallestScn = this.getTransactionCache().getEldestTransactionScnDetailsInCache().map(LogMinerTransactionCache.ScnDetails::scn).orElse(Scn.NULL);
            if (smallestScn.isNull()) {
                return;
            }
            Optional<Scn> lastScnToAbandonTransactions = this.getLastScnToAbandon(this.getConnection(), retention);
            if (lastScnToAbandonTransactions.isPresent()) {
                Scn thresholdScn = lastScnToAbandonTransactions.get();
                if (thresholdScn.compareTo(smallestScn) >= 0) {
                    Map abandoned = this.getTransactionCache().streamTransactionsAndReturn(stream -> stream.filter(t -> t.getStartScn().compareTo(thresholdScn) <= 0).collect(Collectors.toMap(Transaction::getTransactionId, t -> t)));
                    boolean first = true;
                    for (Map.Entry entry : abandoned.entrySet()) {
                        if (first) {
                            LOGGER.warn("All transactions with SCN <= {} will be abandoned.", (Object)thresholdScn);
                            first = false;
                        }
                        String key = (String)entry.getKey();
                        Transaction value = (Transaction)entry.getValue();
                        LOGGER.warn("Transaction {} (start SCN {}, change time {}, redo thread {}, {} events{}) is being abandoned.", new Object[]{key, value.getStartScn(), value.getChangeTime(), value.getRedoThreadId(), value.getNumberOfEvents(), this.getLoggedAbandonedTransactionTableNames(value)});
                        this.cleanupAfterTransactionRemovedFromCache(value, true);
                        this.getTransactionCache().removeTransaction(value);
                        this.getMetrics().addAbandonedTransactionId(key);
                    }
                    if (!abandoned.isEmpty()) {
                        this.getMetrics().setActiveTransactionCount(this.getTransactionCache().getTransactionCount());
                        this.getMetrics().setBufferedEventCount(this.getTransactionCache().getTransactionEvents());
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]", (Object)String.join((CharSequence)",", abandoned.keySet()));
                        this.getTransactionCache().transactions(stream -> LOGGER.debug("List of transactions in the cache after transactions begin abandoned: [{}]", (Object)stream.map(Transaction::getTransactionId).collect(Collectors.joining(","))));
                    }
                    this.getTransactionCache().getEldestTransactionScnDetailsInCache().ifPresentOrElse(scnDetails -> this.getMetrics().setOldestScnDetails(scnDetails.scn(), scnDetails.changeTime()), () -> this.getMetrics().setOldestScnDetails(Scn.NULL, null));
                    this.getOffsetContext().setScn(thresholdScn);
                }
                this.getEventDispatcher().dispatchHeartbeatEvent((Partition)this.getPartition(), (OffsetContext)this.getOffsetContext());
            }
        }
    }

    private String getLoggedAbandonedTransactionTableNames(Transaction transaction) throws InterruptedException {
        if (ABANDONED_DETAILS_LOGGER.isDebugEnabled()) {
            HashSet tableNames = new HashSet();
            this.getTransactionCache().forEachEvent(transaction, event -> {
                tableNames.add(event.getTableId().identifier());
                return true;
            });
            return String.format(", %d tables [%s]", tableNames.size(), String.join((CharSequence)",", tableNames));
        }
        return "";
    }

    private Optional<Scn> getLastScnToAbandon(OracleConnection connection, Duration retention) {
        try {
            if (this.lastProcessedScn.isNull()) {
                return Optional.empty();
            }
            BigInteger scnToAbandon = (BigInteger)connection.singleOptionalValue(SqlUtils.getScnByTimeDeltaQuery(this.lastProcessedScn, retention), rs -> rs.getBigDecimal(1).toBigInteger());
            return Optional.of(new Scn(scnToAbandon));
        }
        catch (SQLException e) {
            Scn calculatedLastScn;
            if (this.lastProcessedScnChangeTime != null && !(calculatedLastScn = this.getLastScnToAbandonFallbackByTransactionChangeTime(retention)).isNull()) {
                return Optional.of(calculatedLastScn);
            }
            LOGGER.error("Cannot fetch SCN {} by given duration to calculate SCN to abandon", (Object)this.lastProcessedScn, (Object)e);
            this.getMetrics().incrementErrorCount();
            return Optional.empty();
        }
    }

    private Scn getLastScnToAbandonFallbackByTransactionChangeTime(Duration retention) {
        LOGGER.debug("Getting abandon SCN breakpoint based on change time {} (retention {} minutes).", (Object)this.lastProcessedScnChangeTime, (Object)retention.toMinutes());
        return this.getTransactionCache().streamTransactionsAndReturn(stream -> stream.filter(t -> {
            Instant changeTime = t.getChangeTime();
            long diffMinutes = Duration.between(this.lastProcessedScnChangeTime, changeTime).abs().toMinutes();
            LOGGER.debug("Transaction {} with SCN {} started at {}, age is {} minutes.", new Object[]{t.getTransactionId(), t.getStartScn(), changeTime, diffMinutes});
            return diffMinutes > 0L && diffMinutes > retention.toMinutes();
        }).max(Comparator.comparing(Transaction::getStartScn)).map(Transaction::getStartScn).orElse(Scn.NULL));
    }

    private void logActiveTransactions() {
        if (LOGGER.isDebugEnabled() && !this.getTransactionCache().isEmpty()) {
            this.cacheProvider.getTransactionCache().transactions(transactions -> LOGGER.debug("All active transactions: {}", (Object)transactions.map(t -> t.getTransactionId() + " (" + String.valueOf(t.getStartScn()) + ")").collect(Collectors.joining(","))));
        }
    }

    public boolean abandonTransactionById(String transactionId) throws InterruptedException {
        if (Strings.isNullOrEmpty((String)transactionId)) {
            LOGGER.warn("Abandon transaction requested with null/empty transaction id");
            return false;
        }
        if (!this.getTransactionCache().containsTransaction(transactionId)) {
            LOGGER.warn("Transaction '{}' not found in cache, cannot abandon", (Object)transactionId);
            return false;
        }
        Transaction transaction = this.getTransactionCache().getAndRemoveTransaction(transactionId);
        if (transaction == null) {
            LOGGER.warn("Transaction '{}' was not present when attempting to abandon", (Object)transactionId);
            return false;
        }
        LOGGER.info("Manually abandoning transaction {} as requested", (Object)transactionId);
        try {
            this.cleanupAfterTransactionRemovedFromCache(transaction, true);
        }
        catch (Exception e) {
            LOGGER.error("Failed to cleanup after abandoning transaction {}", (Object)transactionId, (Object)e);
            this.getMetrics().incrementErrorCount();
            throw e;
        }
        this.getMetrics().addAbandonedTransactionId(transactionId);
        this.getMetrics().setActiveTransactionCount(this.getTransactionCache().getTransactionCount());
        this.getMetrics().setBufferedEventCount(this.getTransactionCache().getTransactionEvents());
        this.getTransactionCache().getEldestTransactionScnDetailsInCache().ifPresentOrElse(scnDetails -> this.getMetrics().setOldestScnDetails(scnDetails.scn(), scnDetails.changeTime()), () -> this.getMetrics().setOldestScnDetails(Scn.NULL, null));
        if (this.getEventDispatcher().heartbeatsEnabled()) {
            this.getEventDispatcher().dispatchHeartbeatEvent((Partition)this.getPartition(), (OffsetContext)this.getOffsetContext());
        } else {
            LOGGER.info("Heartbeats are not enabled, offsets will be updated on the next committed transaction");
        }
        LOGGER.info("Successfully dropped transaction '{}' from Oracle LogMiner buffer via manual request", (Object)transactionId);
        return true;
    }

    public record ProcessResult(Scn miningSessionStartScn, Scn readStartScn) {
    }
}

