/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer.processor.infinispan;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.connector.oracle.logminer.processor.infinispan.InMemoryPendingTransactionsCache;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanTransaction;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId;
import io.debezium.util.Loggings;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractInfinispanLogMinerEventProcessor
extends AbstractLogMinerEventProcessor<InfinispanTransaction>
implements CacheProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractInfinispanLogMinerEventProcessor.class);
    private final OracleConnection jdbcConnection;
    private final LogMinerStreamingChangeEventSourceMetrics metrics;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache();
    private static AbstractInfinispanLogMinerEventProcessor instance;

    public AbstractInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, EventDispatcher<OraclePartition, TableId> dispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, LogMinerStreamingChangeEventSourceMetrics metrics) {
        super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection);
        this.jdbcConnection = jdbcConnection;
        this.metrics = metrics;
        this.partition = partition;
        this.offsetContext = offsetContext;
        this.dispatcher = dispatcher;
        instance = this;
    }

    protected void reCreateInMemoryCache() {
        try (Stream trStream = this.getTransactionCache().keySet().stream();){
            trStream.forEach(tr -> {
                try (Stream eventStream = this.getEventCache().keySet().stream();){
                    int count = (int)eventStream.filter(k -> k.startsWith(tr + "-")).count();
                    LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count);
                    this.inMemoryPendingTransactionsCache.initKey((String)tr, count);
                }
            });
        }
    }

    public static void logCacheStats() {
        if (instance != null) {
            instance.displayCacheStatistics();
        } else {
            LOGGER.trace("AbstractInfinispanLogMinerEventProcessor is not initialized, skipping logging stats.");
        }
    }

    @Override
    public void displayCacheStatistics() {
        LOGGER.info("Overall Cache Statistics:");
        LOGGER.info("\tTransactions        : {}", (Object)this.getTransactionCache().size());
        LOGGER.info("\tRecent Transactions : {}", (Object)this.getProcessedTransactionsCache().size());
        LOGGER.info("\tSchema Changes      : {}", (Object)this.getSchemaChangesCache().size());
        LOGGER.info("\tEvents              : {}", (Object)this.getEventCache().size());
        if (!this.getEventCache().isEmpty() && LOGGER.isDebugEnabled()) {
            try (Stream stream = this.getEventCache().keySet().stream();){
                stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey));
            }
        }
    }

    @Override
    protected boolean isRecentlyProcessed(String transactionId) {
        return this.getProcessedTransactionsCache().containsKey((Object)transactionId);
    }

    @Override
    protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
        return new InfinispanTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName(), row.getThread());
    }

    @Override
    protected void removeEventWithRowId(LogMinerEventRow row) {
        List<String> eventKeys = this.getTransactionKeysWithPrefix(row.getTransactionId() + "-");
        if (eventKeys.isEmpty() && this.isTransactionIdWithNoSequence(row.getTransactionId())) {
            String transactionPrefix = this.getTransactionIdPrefix(row.getTransactionId());
            LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", (Object)row.getTransactionId());
            LOGGER.debug("Checking all transactions with prefix '{}'", (Object)transactionPrefix);
            eventKeys = this.getTransactionKeysWithPrefix(transactionPrefix);
            if (!eventKeys.isEmpty()) {
                eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
                for (String eventKey : eventKeys) {
                    LogMinerEvent event = (LogMinerEvent)this.getEventCache().get((Object)eventKey);
                    if (event == null || !event.getRowId().equals(row.getRowId())) continue;
                    Loggings.logDebugAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Undo change on table '{}' applied to transaction '{}'", (Object[])new Object[]{row.getTableId(), eventKey});
                    this.getEventCache().remove((Object)eventKey);
                    this.inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
                    return;
                }
                Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot undo change on table '{}' since event with row-id {} was not found.", (Object[])new Object[]{row.getTableId(), row.getRowId()});
            } else if (!this.getConfig().isLobEnabled()) {
                Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot undo change on table '{}' since transaction '{}' was not found.", (Object[])new Object[]{row.getTableId(), row.getTransactionId()});
            }
        } else {
            eventKeys.sort(EventKeySortComparator.INSTANCE.reversed());
            for (String eventKey : eventKeys) {
                LogMinerEvent event = (LogMinerEvent)this.getEventCache().get((Object)eventKey);
                if (event == null || !event.getRowId().equals(row.getRowId())) continue;
                LOGGER.debug("Undo applied for event {}.", (Object)event);
                this.getEventCache().remove((Object)eventKey);
                this.inMemoryPendingTransactionsCache.decrement(row.getTransactionId());
                return;
            }
            Loggings.logWarningAndTraceRecord((Logger)LOGGER, (Object)row, (String)"Cannot undo change on table '{}' since event with row-id {} was not found.", (Object[])new Object[]{row.getTableId(), row.getRowId()});
        }
    }

    private List<String> getTransactionKeysWithPrefix(String prefix) {
        try (Stream stream = this.getEventCache().keySet().stream();){
            List<String> list = stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList());
            return list;
        }
    }

    @Override
    protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException {
        String transactionId = row.getTransactionId();
        if (this.isRecentlyProcessed(transactionId)) {
            LOGGER.debug("Transaction {} has been seen by connector, skipped.", (Object)transactionId);
            return;
        }
        super.processRow(partition, row);
    }

    @Override
    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
        return this.getSchemaChangesCache().containsKey((Object)row.getScn().toString());
    }

    @Override
    protected InfinispanTransaction getAndRemoveTransactionFromCache(String transactionId) {
        InfinispanTransaction transaction = (InfinispanTransaction)this.getTransactionCache().get((Object)transactionId);
        if (transaction != null) {
            this.getTransactionCache().remove((Object)transactionId);
        }
        return transaction;
    }

    @Override
    protected void cleanupAfterTransactionRemovedFromCache(InfinispanTransaction transaction, boolean isAbandoned) {
        super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned);
        this.removeEventsWithTransaction(transaction);
    }

    @Override
    protected Iterator<LogMinerEvent> getTransactionEventIterator(final InfinispanTransaction transaction) {
        return new Iterator<LogMinerEvent>(){
            private final int count;
            private LogMinerEvent nextEvent;
            private int index;
            {
                this.count = transaction.getNumberOfEvents();
                this.index = 0;
            }

            @Override
            public boolean hasNext() {
                while (this.index < this.count) {
                    this.nextEvent = (LogMinerEvent)AbstractInfinispanLogMinerEventProcessor.this.getEventCache().get((Object)transaction.getEventId(this.index));
                    if (this.nextEvent != null) break;
                    LOGGER.debug("Event {} must have been undone, skipped.", (Object)this.index);
                    ++this.index;
                }
                return this.index < this.count;
            }

            @Override
            public LogMinerEvent next() {
                ++this.index;
                return this.nextEvent;
            }
        };
    }

    @Override
    protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
        this.getAbandonedTransactionsCache().remove(transactionId);
        if (this.getConfig().isLobEnabled()) {
            this.getProcessedTransactionsCache().put((Object)transactionId, (Object)commitScn.toString());
        }
    }

    @Override
    protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
        InfinispanTransaction transaction = (InfinispanTransaction)this.getTransactionCache().get((Object)transactionId);
        if (transaction != null) {
            this.removeEventsWithTransaction(transaction);
            this.getTransactionCache().remove((Object)transactionId);
        }
        this.getAbandonedTransactionsCache().remove(transactionId);
        if (this.getConfig().isLobEnabled()) {
            this.getProcessedTransactionsCache().put((Object)transactionId, (Object)rollbackScn.toString());
        }
    }

    @Override
    protected void resetTransactionToStart(InfinispanTransaction transaction) {
        super.resetTransactionToStart(transaction);
        this.getTransactionCache().put((Object)transaction.getTransactionId(), (Object)transaction);
    }

    @Override
    protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
        super.handleSchemaChange(row);
        if (row.getTableName() != null) {
            this.getSchemaChangesCache().put((Object)row.getScn().toString(), (Object)row.getTableId().identifier());
        }
    }

    @Override
    protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
        if (this.getAbandonedTransactionsCache().contains(transactionId)) {
            LOGGER.warn("Event for abandoned transaction {}, skipped.", (Object)transactionId);
            return;
        }
        if (!this.isRecentlyProcessed(transactionId)) {
            InfinispanTransaction transaction = (InfinispanTransaction)this.getTransactionCache().get((Object)transactionId);
            if (transaction == null) {
                LOGGER.trace("Transaction {} is not in cache, creating.", (Object)transactionId);
                transaction = this.createTransaction(row);
            }
            if (this.isTransactionOverEventThreshold(transaction)) {
                this.abandonTransactionOverEventThreshold(transaction);
                return;
            }
            String eventKey = transaction.getEventId(transaction.getNextEventId());
            if (!this.getEventCache().containsKey((Object)eventKey)) {
                LOGGER.trace("Transaction {}, adding event reference at key {}", (Object)transactionId, (Object)eventKey);
                this.getEventCache().put((Object)eventKey, (Object)eventSupplier.get());
                this.metrics.calculateLagFromSource(row.getChangeTime());
                this.inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
            }
            this.getTransactionCache().put((Object)transactionId, (Object)transaction);
            this.metrics.setActiveTransactionCount(this.getTransactionCache().size());
        } else {
            LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", (Object)transactionId);
        }
    }

    @Override
    protected int getTransactionEventCount(InfinispanTransaction transaction) {
        return this.inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId());
    }

    @Override
    protected PreparedStatement createQueryStatement() throws SQLException {
        return this.jdbcConnection.connection().prepareStatement(this.getQueryString(), 1003, 1007, 1);
    }

    @Override
    protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException {
        Instant minCacheScnChangeTime;
        Scn minCacheScn;
        Optional oldestTransaction = this.getOldestTransactionInCache();
        if (oldestTransaction.isPresent()) {
            minCacheScn = ((InfinispanTransaction)oldestTransaction.get()).getStartScn();
            minCacheScnChangeTime = ((InfinispanTransaction)oldestTransaction.get()).getChangeTime();
        } else {
            minCacheScn = Scn.NULL;
            minCacheScnChangeTime = null;
        }
        if (!minCacheScn.isNull()) {
            this.abandonTransactions(this.getConfig().getLogMiningTransactionRetention());
            this.purgeCache(minCacheScn);
        } else {
            this.getSchemaChangesCache().entrySet().removeIf(e -> true);
        }
        if (this.getConfig().isLobEnabled()) {
            if (this.getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
                this.offsetContext.setScn(maxCommittedScn);
                this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
            } else if (!minCacheScn.isNull()) {
                this.getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf((String)entry.getValue()).compareTo(minCacheScn) < 0);
                this.offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
                this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
            }
            return this.offsetContext.getScn();
        }
        if (!this.getLastProcessedScn().isNull() && this.getLastProcessedScn().compareTo(endScn) < 0) {
            endScn = this.getLastProcessedScn();
        }
        this.offsetContext.setScn(endScn);
        this.metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime);
        this.metrics.setOffsetScn(endScn);
        this.dispatcher.dispatchHeartbeatEvent((Partition)this.partition, (OffsetContext)this.offsetContext);
        return endScn;
    }

    protected abstract void purgeCache(Scn var1);

    protected <K, V> void removeIf(CloseableIterator<Map.Entry<K, V>> iterator, Predicate<Map.Entry<K, V>> filter) {
        try (CloseableIterator<Map.Entry<K, V>> it = iterator;){
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry)it.next();
                if (!filter.test(entry)) continue;
                it.remove();
            }
        }
    }

    private void removeEventsWithTransaction(InfinispanTransaction transaction) {
        for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
            this.getEventCache().remove((Object)transaction.getEventId(i));
        }
        this.inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
    }

    private static class EventKeySortComparator
    implements Comparator<String> {
        public static EventKeySortComparator INSTANCE = new EventKeySortComparator();

        private EventKeySortComparator() {
        }

        @Override
        public int compare(String o1, String o2) {
            String[] s2;
            if (o1 == null || !o1.contains("-")) {
                throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
            }
            if (o2 == null || !o2.contains("-")) {
                throw new IllegalStateException("Event Key must be in the format of <transaction>-<event>");
            }
            String[] s1 = o1.split("-");
            int result = s1[0].compareTo((s2 = o2.split("-"))[0]);
            if (result == 0) {
                result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1]));
            }
            return result;
        }
    }
}

