/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.RedoThreadState;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.LogFileNotFoundException;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.function.Predicates;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Strings;
import java.math.BigInteger;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogFileCollector {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogFileCollector.class);
    private static final String STATUS_CURRENT = "CURRENT";
    private static final String ONLINE_LOG_TYPE = "ONLINE";
    private static final String ARCHIVE_LOG_TYPE = "ARCHIVED";
    private final Duration initialDelay;
    private final Duration maxRetryDelay;
    private final int maxAttempts;
    private final Duration archiveLogRetention;
    private final boolean archiveLogOnlyMode;
    private final String archiveLogDestinationName;
    private final OracleConnection connection;

    public LogFileCollector(OracleConnectorConfig connectorConfig, OracleConnection connection) {
        this.initialDelay = connectorConfig.getLogMiningInitialDelay();
        this.maxRetryDelay = connectorConfig.getLogMiningMaxDelay();
        this.maxAttempts = connectorConfig.getMaximumNumberOfLogQueryRetries();
        this.archiveLogRetention = connectorConfig.getArchiveLogRetention();
        this.archiveLogOnlyMode = connectorConfig.isArchiveLogOnlyMode();
        this.archiveLogDestinationName = connectorConfig.getArchiveLogDestinationName();
        this.connection = connection;
    }

    public List<LogFile> getLogs(Scn offsetScn) throws SQLException, LogFileNotFoundException {
        LOGGER.debug("Collecting logs based on the read SCN position {}.", (Object)offsetScn);
        DelayStrategy retryStrategy = DelayStrategy.exponential((Duration)this.initialDelay, (Duration)this.maxRetryDelay);
        for (int attempt = 0; attempt <= this.maxAttempts; ++attempt) {
            RedoThreadState currentRedoThreadState = this.connection.getRedoThreadState();
            for (RedoThreadState.RedoThread redoThread : currentRedoThreadState.getThreads()) {
                LOGGER.debug("Thread {}: {}", (Object)redoThread.getThreadId(), (Object)redoThread);
            }
            List<LogFile> files = this.getLogsForOffsetScn(offsetScn);
            if (this.isLogFileListConsistent(offsetScn, files, currentRedoThreadState)) {
                return files;
            }
            LOGGER.info("No logs available yet (attempt {})...", (Object)(attempt + 1));
            retryStrategy.sleepWhen(true);
        }
        throw new LogFileNotFoundException(offsetScn);
    }

    public List<LogFile> getLogsForOffsetScn(Scn offsetScn) throws SQLException {
        LinkedHashSet<LogFile> onlineRedoLogs = new LinkedHashSet<LogFile>();
        LinkedHashSet<LogFile> archiveLogs = new LinkedHashSet<LogFile>();
        this.connection.query(this.getLogsQuery(offsetScn), rs -> {
            while (rs.next()) {
                LogFile log;
                String fileName = rs.getString(1);
                Scn firstScn = this.getScnFromString(rs.getString(2));
                Scn nextScn = this.getScnFromString(rs.getString(3));
                String status = rs.getString(5);
                String type = rs.getString(6);
                BigInteger sequence = new BigInteger(rs.getString(7));
                int thread = rs.getInt(10);
                if (ARCHIVE_LOG_TYPE.equals(type)) {
                    log = new LogFile(fileName, firstScn, nextScn, sequence, LogFile.Type.ARCHIVE, thread);
                    if (log.getNextScn().compareTo(offsetScn) < 0) continue;
                    LOGGER.debug("Archive log {} with SCN range {} to {} sequence {} to be added.", new Object[]{fileName, firstScn, nextScn, sequence});
                    archiveLogs.add(log);
                    continue;
                }
                if (!ONLINE_LOG_TYPE.equals(type)) continue;
                log = new LogFile(fileName, firstScn, nextScn, sequence, LogFile.Type.REDO, STATUS_CURRENT.equalsIgnoreCase(status), thread);
                if (log.isCurrent() || log.getNextScn().compareTo(offsetScn) >= 0) {
                    LOGGER.debug("Online redo log {} with SCN range {} to {} ({}) sequence {} to be added.", new Object[]{fileName, firstScn, nextScn, status, sequence});
                    onlineRedoLogs.add(log);
                    continue;
                }
                LOGGER.debug("Online redo log {} with SCN range {} to {} ({}) sequence {} to be excluded.", new Object[]{fileName, firstScn, nextScn, status, sequence});
            }
        });
        return this.deduplicateLogFiles(archiveLogs, onlineRedoLogs);
    }

    public List<LogFile> deduplicateLogFiles(Collection<LogFile> archiveLogFiles, Collection<LogFile> onlineLogFiles) {
        for (LogFile redoLog : onlineLogFiles) {
            archiveLogFiles.removeIf(archiveLog -> {
                if (archiveLog.equals(redoLog)) {
                    LOGGER.debug("Removing redo thread {} archive log {} with duplicate sequence {} with redo log {}", new Object[]{archiveLog.getThread(), archiveLog.getFileName(), archiveLog.getSequence(), redoLog.getFileName()});
                    return true;
                }
                return false;
            });
        }
        ArrayList<LogFile> allLogs = new ArrayList<LogFile>();
        allLogs.addAll(archiveLogFiles);
        allLogs.addAll(onlineLogFiles);
        return allLogs;
    }

    public boolean isLogFileListConsistent(Scn startScn, List<LogFile> logs, RedoThreadState currentRedoThreadState) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Performing consistency check on the following collected logs:");
            for (LogFile logFile : logs) {
                LOGGER.debug("\tLog: {}", (Object)logFile);
            }
            LOGGER.debug("Current redo thread state:");
            for (RedoThreadState.RedoThread redoThread : currentRedoThreadState.getThreads()) {
                LOGGER.debug("\tThread: {}", (Object)redoThread);
            }
        }
        Map<Integer, List<LogFile>> redoThreadLogs = logs.stream().collect(Collectors.groupingBy(LogFile::getThread));
        List currentThreads = currentRedoThreadState.getThreads().stream().map(RedoThreadState.RedoThread::getThreadId).collect(Collectors.toList());
        for (Integer threadId : currentThreads) {
            RedoThreadState.RedoThread redoThread = currentRedoThreadState.getRedoThread(threadId);
            if (!(redoThread.isOpen() ? !this.isOpenThreadConsistent(redoThread, startScn, redoThreadLogs.get(threadId)) : !this.isClosedThreadConsistent(redoThread, startScn, redoThreadLogs.get(threadId)))) continue;
            return false;
        }
        logs.stream().map(LogFile::getThread).filter(Predicates.not(currentThreads::contains)).forEach(this::logThreadCheckSkippedNotInDatabase);
        return true;
    }

    private boolean isOpenThreadConsistent(RedoThreadState.RedoThread thread, Scn startScn, List<LogFile> threadLogs) {
        int threadId = thread.getThreadId();
        Scn enabledScn = thread.getEnabledScn();
        Scn checkpointScn = thread.getCheckpointScn();
        if (thread.isDisabled()) {
            LogFileCollector.logException(String.format("Redo thread %d expected to have ENABLED with value PUBLIC or PRIVATE.", threadId));
            return false;
        }
        if (threadLogs == null || threadLogs.isEmpty()) {
            LogFileCollector.logException(String.format("Redo thread %d is inconsistent; enabled SCN %s checkpoint SCN %s reading from SCN %s, no logs found.", threadId, enabledScn, checkpointScn, startScn));
            return false;
        }
        if (enabledScn.compareTo(startScn) > 0) {
            List<LogFile> enabledLogs = threadLogs.stream().filter(log -> log.isScnInLogFileRange(enabledScn) || log.getFirstScn().compareTo(enabledScn) > 0).collect(Collectors.toList());
            if (enabledLogs.isEmpty()) {
                LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; expected logs after enabled SCN %s", threadId, enabledLogs));
                return false;
            }
            Optional<Long> missingSequence = this.getFirstLogMissingSequence(enabledLogs);
            if (missingSequence.isPresent()) {
                LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d (enabled).", threadId, missingSequence.get()));
                return false;
            }
            LOGGER.debug("Redo Thread {} is consistent after enabled SCN {} ({}).", new Object[]{threadId, enabledScn, thread.getStatus()});
        } else {
            BigInteger sequence;
            if (threadLogs.stream().noneMatch(log -> log.isScnInLogFileRange(startScn)) && this.connection.getArchiveLogFile(threadId, (sequence = this.getMinRedoThreadLogSequence(threadLogs)).longValue() - 1L) == null) {
                LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; failed to find archive log with sequence %d", threadId, sequence.longValue() - 1L));
                return false;
            }
            Optional<Long> missingSequence = this.getFirstLogMissingSequence(threadLogs);
            if (missingSequence.isPresent()) {
                LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d", threadId, missingSequence.get()));
                return false;
            }
            LOGGER.debug("Redo Thread {} is consistent.", (Object)threadId);
        }
        return true;
    }

    private boolean isClosedThreadConsistent(RedoThreadState.RedoThread thread, Scn startScn, List<LogFile> threadLogs) {
        int threadId = thread.getThreadId();
        if (!thread.isDisabled()) {
            Scn checkpointScn = thread.getCheckpointScn();
            Scn enabledScn = thread.getEnabledScn();
            if (threadLogs != null && !threadLogs.isEmpty()) {
                Optional<Long> missingSequence;
                List<LogFile> logsToCheck;
                if (checkpointScn.compareTo(startScn) < 0) {
                    if (LOGGER.isDebugEnabled()) {
                        for (LogFile logFile : threadLogs) {
                            LOGGER.debug("Read Thread {} query has log {}; not expected.", (Object)threadId, (Object)logFile);
                        }
                    }
                    LogFileCollector.logException(String.format("Redo Thread %d stopped at SCN %s, but logs detected using SCN %s.", threadId, checkpointScn, startScn));
                    return false;
                }
                if (enabledScn.compareTo(startScn) > 0) {
                    logsToCheck = threadLogs.stream().filter(log -> log.isScnInLogFileRange(enabledScn) || log.getNextScn().compareTo(enabledScn) >= 0).filter(log -> log.isScnInLogFileRange(checkpointScn) || log.getFirstScn().compareTo(checkpointScn) < 0).collect(Collectors.toList());
                    if (logsToCheck.isEmpty()) {
                        LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; expected logs between enabled SCN %s and checkpoint SCN %s", threadId, enabledScn, checkpointScn));
                        return false;
                    }
                } else {
                    logsToCheck = threadLogs.stream().filter(log -> log.isScnInLogFileRange(checkpointScn) || log.getFirstScn().compareTo(checkpointScn) < 0).collect(Collectors.toList());
                    if (logsToCheck.isEmpty()) {
                        LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; expected logs before checkpoint SCN %s", threadId, checkpointScn));
                        return false;
                    }
                }
                if ((missingSequence = this.getFirstLogMissingSequence(logsToCheck)).isPresent()) {
                    LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d (checkpoint).", threadId, missingSequence.get()));
                    return false;
                }
            }
            LOGGER.debug("Redo Thread {} is consistent before checkpoint SCN {} ({}).", new Object[]{threadId, checkpointScn, thread.getStatus()});
        } else {
            Scn disabledScn = thread.getDisabledScn();
            if (disabledScn.isNull() || disabledScn.asBigInteger().equals(BigInteger.ZERO)) {
                LOGGER.debug("Redo Thread {} is disabled but has no disabled SCN; consistency check skipped.", (Object)threadId);
                return true;
            }
            if (threadLogs != null && !threadLogs.isEmpty()) {
                if (disabledScn.compareTo(startScn) < 0) {
                    if (LOGGER.isDebugEnabled()) {
                        for (LogFile log2 : threadLogs) {
                            LOGGER.debug("Redo Thread {} log {} not expected.", (Object)threadId, (Object)log2);
                        }
                    }
                    LogFileCollector.logException(String.format("Redo Thread %d disabled at SCN %s, but logs detected using SCN %s.", threadId, disabledScn, startScn));
                    return false;
                }
                List<LogFile> disabledLogs = threadLogs.stream().filter(log -> log.isScnInLogFileRange(disabledScn) || log.getFirstScn().compareTo(disabledScn) < 0).collect(Collectors.toList());
                if (disabledLogs.isEmpty()) {
                    LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; expected logs before disabled SCN %s.", threadId, disabledLogs));
                    return false;
                }
                Optional<Long> missingSequence = this.getFirstLogMissingSequence(disabledLogs);
                if (missingSequence.isPresent()) {
                    LogFileCollector.logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d.", threadId, missingSequence.get()));
                    return false;
                }
            }
            LOGGER.debug("Redo Thread {} is consistent after disabled SCN {} ({}).", new Object[]{threadId, disabledScn, thread.getStatus()});
        }
        return true;
    }

    private Optional<Long> getFirstLogMissingSequence(List<LogFile> logFiles) {
        SequenceRange range = this.getSequenceRangeForRedoThreadLogs(logFiles);
        for (long sequence = range.getMin(); sequence <= range.getMax(); ++sequence) {
            if (this.hasLogFileWithSequenceNumber(sequence, logFiles)) continue;
            return Optional.of(sequence);
        }
        return Optional.empty();
    }

    private void logThreadCheckSkippedNotInDatabase(int threadId) {
        LOGGER.warn("Log found for redo thread {} but no record in V$THREAD; thread consistency check skipped.", (Object)threadId);
    }

    private String getLogsQuery(Scn offsetScn) {
        return SqlUtils.allMinableLogsQuery(offsetScn, this.archiveLogRetention, this.archiveLogOnlyMode, this.archiveLogDestinationName);
    }

    private Scn getScnFromString(String value) {
        return Strings.isNullOrBlank((String)value) ? Scn.MAX : Scn.valueOf(value);
    }

    private boolean hasLogFileWithSequenceNumber(long sequenceId, List<LogFile> redoThreadLogs) {
        return redoThreadLogs.stream().map(LogFile::getSequence).anyMatch(sequence -> sequence.longValue() == sequenceId);
    }

    private SequenceRange getSequenceRangeForRedoThreadLogs(List<LogFile> redoThreadLogs) {
        if (redoThreadLogs.isEmpty()) {
            throw new DebeziumException("Cannot calculate log sequence range, log collection is empty.");
        }
        long min = Long.MAX_VALUE;
        long max = Long.MIN_VALUE;
        for (LogFile logFile : redoThreadLogs) {
            min = Math.min(logFile.getSequence().longValue(), min);
            max = Math.max(logFile.getSequence().longValue(), max);
        }
        return new SequenceRange(min, max);
    }

    private BigInteger getMinRedoThreadLogSequence(List<LogFile> redoThreadLogs) {
        if (redoThreadLogs == null || redoThreadLogs.isEmpty()) {
            throw new DebeziumException("Cannot calculate minimum sequence on a null or empty list of logs");
        }
        return redoThreadLogs.stream().map(LogFile::getSequence).min(BigInteger::compareTo).get();
    }

    private static void logException(String message) {
        LOGGER.info("{}", (Object)message, (Object)new DebeziumException(message));
    }

    @Immutable
    private static class SequenceRange {
        private final long min;
        private final long max;

        SequenceRange(long min, long max) {
            this.min = min;
            this.max = max;
        }

        public long getMin() {
            return this.min;
        }

        public long getMax() {
            return this.max;
        }
    }
}

