/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.AbstractStreamingAdapter;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.document.Document;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogMinerAdapter
extends AbstractStreamingAdapter {
    private static final Duration GET_TRANSACTION_SCN_PAUSE = Duration.ofSeconds(1L);
    private static final int GET_TRANSACTION_SCN_ATTEMPTS = 5;
    private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerAdapter.class);
    public static final String TYPE = "logminer";

    public LogMinerAdapter(OracleConnectorConfig connectorConfig) {
        super(connectorConfig);
    }

    @Override
    public String getType() {
        return TYPE;
    }

    @Override
    public HistoryRecordComparator getHistoryRecordComparator() {
        return new HistoryRecordComparator(){

            protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
                return LogMinerAdapter.this.resolveScn(recorded).compareTo(LogMinerAdapter.this.resolveScn(desired)) < 1;
            }
        };
    }

    @Override
    public OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader() {
        return new LogMinerOracleOffsetContextLoader(this.connectorConfig);
    }

    @Override
    public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSource(OracleConnection connection, EventDispatcher<OraclePartition, TableId> dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema, OracleTaskContext taskContext, Configuration jdbcConfig, OracleStreamingChangeEventSourceMetrics streamingMetrics) {
        return new LogMinerStreamingChangeEventSource(this.connectorConfig, connection, dispatcher, errorHandler, clock, schema, jdbcConfig, streamingMetrics);
    }

    @Override
    public OracleOffsetContext determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx, OracleConnectorConfig connectorConfig, OracleConnection connection) throws SQLException {
        Scn latestTableDdlScn = this.getLatestTableDdlScn(ctx, connection).orElse(null);
        String tableName = LogMinerAdapter.getTransactionTableName(connectorConfig);
        LinkedHashMap<String, Scn> pendingTransactions = new LinkedHashMap<String, Scn>();
        Optional<Scn> currentScn = this.isPendingTransactionSkip(connectorConfig) ? this.getCurrentScn(latestTableDdlScn, connection) : this.getPendingTransactions(latestTableDdlScn, connection, pendingTransactions, tableName);
        if (!currentScn.isPresent()) {
            throw new DebeziumException("Failed to resolve current SCN");
        }
        try (OracleConnection conn = new OracleConnection(connection.config(), () -> this.getClass().getClassLoader(), false);){
            conn.setAutoCommit(false);
            if (!Strings.isNullOrEmpty((String)connectorConfig.getPdbName())) {
                conn.resetSessionToCdb();
            }
            OracleOffsetContext oracleOffsetContext = this.determineSnapshotOffset(connectorConfig, conn, currentScn.get(), pendingTransactions, tableName);
            return oracleOffsetContext;
        }
    }

    private Optional<Scn> getCurrentScn(Scn latestTableDdlScn, OracleConnection connection) throws SQLException {
        Scn currentScn;
        String query = "SELECT CURRENT_SCN FROM V$DATABASE";
        while (this.areSameTimestamp(latestTableDdlScn, currentScn = (Scn)connection.queryAndMap("SELECT CURRENT_SCN FROM V$DATABASE", rs -> rs.next() ? Scn.valueOf(rs.getString(1)) : Scn.NULL), connection)) {
        }
        return Optional.ofNullable(currentScn);
    }

    private Optional<Scn> getPendingTransactions(Scn latestTableDdlScn, OracleConnection connection, Map<String, Scn> transactions, String transactionTableName) throws SQLException {
        String query = "SELECT d.CURRENT_SCN, t.XID, t.START_SCN FROM V$DATABASE d LEFT OUTER JOIN " + transactionTableName + " t ON t.START_SCN < d.CURRENT_SCN ";
        Scn currentScn = null;
        do {
            currentScn = null;
            transactions.clear();
            try (Statement s = connection.connection().createStatement();
                 ResultSet rs = s.executeQuery(query);){
                while (rs.next()) {
                    String pendingTxStartScn;
                    if (currentScn == null) {
                        currentScn = Scn.valueOf(rs.getString(1));
                    }
                    if (Strings.isNullOrEmpty((String)(pendingTxStartScn = rs.getString(3)))) continue;
                    transactions.put(HexConverter.convertToHexString((byte[])rs.getBytes(2)), Scn.valueOf(pendingTxStartScn));
                }
            }
            catch (SQLException e) {
                LOGGER.warn("Could not query the {} view: {}", new Object[]{transactionTableName, e.getMessage(), e});
                throw e;
            }
        } while (this.areSameTimestamp(latestTableDdlScn, currentScn, connection));
        for (Map.Entry<String, Scn> transaction : transactions.entrySet()) {
            LOGGER.trace("\tPending Transaction '{}' started at SCN {}", (Object)transaction.getKey(), (Object)transaction.getValue());
        }
        return Optional.ofNullable(currentScn);
    }

    private OracleOffsetContext determineSnapshotOffset(OracleConnectorConfig connectorConfig, OracleConnection connection, Scn currentScn, Map<String, Scn> pendingTransactions, String transactionTableName) throws SQLException {
        if (this.isPendingTransactionSkip(connectorConfig)) {
            LOGGER.info("\tNo in-progress transactions will be captured.");
        } else if (this.isPendingTransactionViewOnly(connectorConfig)) {
            LOGGER.info("\tSkipping transaction logs for resolving snapshot offset, only using {}.", (Object)transactionTableName);
        } else {
            LOGGER.info("\tConsulting {} and transaction logs for resolving snapshot offset.", (Object)transactionTableName);
            this.getPendingTransactionsFromLogs(connection, currentScn, pendingTransactions);
        }
        if (!pendingTransactions.isEmpty()) {
            for (Map.Entry<String, Scn> entry : pendingTransactions.entrySet()) {
                LOGGER.info("\tFound in-progress transaction {}, starting at SCN {}", (Object)entry.getKey(), (Object)entry.getValue());
            }
        } else if (!this.isPendingTransactionSkip(connectorConfig)) {
            LOGGER.info("\tFound no in-progress transactions.");
        }
        return OracleOffsetContext.create().logicalName(connectorConfig).scn(currentScn).snapshotScn(currentScn).snapshotPendingTransactions(pendingTransactions).transactionContext(new TransactionContext()).incrementalSnapshotContext((IncrementalSnapshotContext<TableId>)new SignalBasedIncrementalSnapshotContext()).build();
    }

    private void addLogsToSession(List<LogFile> logs, OracleConnection connection) throws SQLException {
        for (LogFile logFile : logs) {
            LOGGER.debug("\tAdding log: {}", (Object)logFile.getFileName());
            connection.executeWithoutCommitting(new String[]{SqlUtils.addLogFileStatement("DBMS_LOGMNR.ADDFILE", logFile.getFileName())});
        }
    }

    private void startSession(OracleConnection connection) throws SQLException {
        String query = "BEGIN sys.dbms_logmnr.start_logmnr(OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;";
        LOGGER.debug("\tStarting mining session");
        connection.executeWithoutCommitting(new String[]{"BEGIN sys.dbms_logmnr.start_logmnr(OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;"});
    }

    private void stopSession(OracleConnection connection) throws SQLException {
        try {
            LOGGER.debug("\tStopping mining session");
            connection.executeWithoutCommitting(new String[]{"BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;"});
        }
        catch (SQLException e) {
            if (e.getMessage().toUpperCase().contains("ORA-01307")) {
                LOGGER.debug("LogMiner mining session is already closed.");
            }
            throw e;
        }
    }

    private Scn getOldestScnAvailableInLogs(OracleConnectorConfig config, OracleConnection connection) throws SQLException {
        Duration archiveLogRetention = config.getLogMiningArchiveLogRetention();
        String archiveLogDestinationName = config.getLogMiningArchiveDestinationName();
        return (Scn)connection.queryAndMap(SqlUtils.oldestFirstChangeQuery(archiveLogRetention, archiveLogDestinationName), rs -> {
            String value;
            if (rs.next() && !Strings.isNullOrEmpty((String)(value = rs.getString(1)))) {
                return Scn.valueOf(value);
            }
            return Scn.NULL;
        });
    }

    private List<LogFile> getOrderedLogsFromScn(OracleConnectorConfig config, Scn sinceScn, OracleConnection connection) throws SQLException {
        return LogMinerHelper.getLogFilesForOffsetScn(connection, sinceScn, config.getLogMiningArchiveLogRetention(), config.isArchiveLogOnlyMode(), config.getLogMiningArchiveDestinationName()).stream().sorted(Comparator.comparing(LogFile::getSequence)).collect(Collectors.toList());
    }

    private void getPendingTransactionsFromLogs(OracleConnection connection, Scn currentScn, Map<String, Scn> pendingTransactions) throws SQLException {
        Scn oldestScn = this.getOldestScnAvailableInLogs(this.connectorConfig, connection);
        List<LogFile> logFiles = this.getOrderedLogsFromScn(this.connectorConfig, oldestScn, connection);
        if (!logFiles.isEmpty()) {
            try {
                this.addLogsToSession(this.getMostRecentLogFilesForSearch(logFiles), connection);
                this.startSession(connection);
                LOGGER.info("\tQuerying transaction logs, please wait...");
                connection.query("SELECT START_SCN, XID FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE=7 AND SCN >= " + currentScn + " AND START_SCN < " + currentScn, rs -> {
                    while (rs.next()) {
                        String transactionId = HexConverter.convertToHexString((byte[])rs.getBytes("XID"));
                        String startScnStr = rs.getString("START_SCN");
                        if (Strings.isNullOrBlank((String)startScnStr)) continue;
                        Scn startScn = Scn.valueOf(rs.getString("START_SCN"));
                        if (pendingTransactions.containsKey(transactionId)) continue;
                        LOGGER.info("\tTransaction '{}' started at SCN '{}'", (Object)transactionId, (Object)startScn);
                        pendingTransactions.put(transactionId, startScn);
                    }
                });
            }
            catch (Exception e) {
                throw new DebeziumException("Failed to resolve snapshot offset", (Throwable)e);
            }
            finally {
                this.stopSession(connection);
            }
        }
    }

    private List<LogFile> getMostRecentLogFilesForSearch(List<LogFile> allLogFiles) {
        HashMap recentLogsPerThread = new HashMap();
        for (LogFile logFile : allLogFiles) {
            if (recentLogsPerThread.containsKey(logFile.getThread()) || !logFile.isCurrent()) continue;
            recentLogsPerThread.put(logFile.getThread(), new ArrayList());
            ((List)recentLogsPerThread.get(logFile.getThread())).add(logFile);
            Optional<LogFile> maxArchiveLogFile = allLogFiles.stream().filter(f -> logFile.getThread() == f.getThread() && logFile.getSequence().compareTo(f.getSequence()) > 0).max(Comparator.comparing(LogFile::getSequence));
            maxArchiveLogFile.ifPresent(file -> ((List)recentLogsPerThread.get(logFile.getThread())).add(file));
        }
        ArrayList<LogFile> logs = new ArrayList<LogFile>();
        for (Map.Entry entry : recentLogsPerThread.entrySet()) {
            logs.addAll((Collection)entry.getValue());
        }
        return logs;
    }

    private boolean isPendingTransactionSkip(OracleConnectorConfig config) {
        return config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.SKIP;
    }

    public boolean isPendingTransactionViewOnly(OracleConnectorConfig config) {
        return config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY;
    }

    private static String getTransactionTableName(OracleConnectorConfig config) {
        if (config.getRacNodes() == null || config.getRacNodes().isEmpty()) {
            return "V$TRANSACTION";
        }
        return "GV$TRANSACTION";
    }
}

