/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.xstream;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.xstream.ChunkColumnValues;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.connector.oracle.xstream.XStreamChangeRecordEmitter;
import io.debezium.connector.oracle.xstream.XStreamStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import oracle.streams.ChunkColumnValue;
import oracle.streams.DDLLCR;
import oracle.streams.DefaultRowLCR;
import oracle.streams.LCR;
import oracle.streams.RowLCR;
import oracle.streams.StreamsException;
import oracle.streams.XStreamLCRCallbackHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LcrEventHandler
implements XStreamLCRCallbackHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(LcrEventHandler.class);
    private final OracleConnectorConfig connectorConfig;
    private final ErrorHandler errorHandler;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private final Clock clock;
    private final OracleDatabaseSchema schema;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final boolean tablenameCaseInsensitive;
    private final XstreamStreamingChangeEventSource eventSource;
    private final XStreamStreamingChangeEventSourceMetrics streamingMetrics;
    private final Map<String, ChunkColumnValues> columnChunks;
    private RowLCR currentRow;

    LcrEventHandler(OracleConnectorConfig connectorConfig, ErrorHandler errorHandler, EventDispatcher<OraclePartition, TableId> dispatcher, Clock clock, OracleDatabaseSchema schema, OraclePartition partition, OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive, XstreamStreamingChangeEventSource eventSource, XStreamStreamingChangeEventSourceMetrics streamingMetrics) {
        this.connectorConfig = connectorConfig;
        this.errorHandler = errorHandler;
        this.dispatcher = dispatcher;
        this.clock = clock;
        this.schema = schema;
        this.partition = partition;
        this.offsetContext = offsetContext;
        this.tablenameCaseInsensitive = tablenameCaseInsensitive;
        this.eventSource = eventSource;
        this.streamingMetrics = streamingMetrics;
        this.columnChunks = new LinkedHashMap<String, ChunkColumnValues>();
    }

    public void processLCR(LCR lcr) throws StreamsException {
        LOGGER.trace("Received LCR {}", (Object)lcr);
        try {
            this.setWatermark();
            this.columnChunks.clear();
            LcrPosition lcrPosition = new LcrPosition(lcr.getPosition());
            LcrPosition offsetLcrPosition = LcrPosition.valueOf(this.offsetContext.getLcrPosition());
            if (lcrPosition.compareTo(offsetLcrPosition) <= 0) {
                if (LOGGER.isDebugEnabled()) {
                    LcrPosition recPosition = offsetLcrPosition;
                    LOGGER.debug("Ignoring change event with already processed SCN/LCR Position {}/{}, last recorded {}/{}", new Object[]{lcrPosition, lcrPosition.getScn(), recPosition != null ? recPosition : "none", recPosition != null ? recPosition.getScn() : "none"});
                }
                return;
            }
            this.offsetContext.setRowId("");
            this.offsetContext.setScn(lcrPosition.getScn());
            this.offsetContext.setEventCommitScn(lcrPosition.getCommitScn());
            this.offsetContext.setEventScn(lcrPosition.getScn());
            this.offsetContext.setLcrPosition(lcrPosition.toString());
            this.offsetContext.setTransactionId(lcr.getTransactionId());
            this.offsetContext.tableEvent(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()), lcr.getSourceTime().timestampValue().toInstant());
            if (lcr instanceof RowLCR) {
                this.processRowLCR((RowLCR)lcr);
            } else if (lcr instanceof DDLLCR) {
                this.dispatchSchemaChangeEvent((DDLLCR)lcr);
            }
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            LOGGER.info("Received signal to stop, event loop will halt");
        }
        catch (Exception e) {
            this.errorHandler.setProducerThrowable((Throwable)e);
        }
    }

    private void processRowLCR(RowLCR row) throws InterruptedException {
        if (row.getCommandType().equals("LOB ERASE")) {
            LOGGER.warn("LOB_ERASE for table '{}' is not supported, use DML operations to manipulate LOB columns only.", (Object)row.getObjectName());
            return;
        }
        if (row.hasChunkData()) {
            this.currentRow = row;
        } else {
            this.dispatchDataChangeEvent(row, null);
        }
    }

    private void dispatchDataChangeEvent(RowLCR lcr, Map<String, Object> chunkValues) throws InterruptedException {
        LOGGER.debug("Processing DML event {}", (Object)lcr);
        if ("COMMIT".equals(lcr.getCommandType())) {
            Instant commitTimestamp = lcr.getSourceTime().timestampValue().toInstant();
            this.dispatcher.dispatchTransactionCommittedEvent((Partition)this.partition, (OffsetContext)this.offsetContext, commitTimestamp);
            return;
        }
        TableId tableId = this.getTableId((LCR)lcr);
        Table table = this.schema.tableFor(tableId);
        if (table == null) {
            String tableDdl;
            if (!this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Table {} is new but excluded, schema change skipped.", (Object)tableId);
                return;
            }
            LOGGER.warn("Obtaining schema for table {}, which should be already loaded, this may signal potential bug in fetching table schemas.", (Object)tableId);
            try {
                tableDdl = this.getTableMetadataDdl(tableId);
            }
            catch (OracleConnection.NonRelationalTableException e) {
                LOGGER.warn("{} The event will be skipped.", (Object)e.getMessage());
                this.streamingMetrics.incrementWarningCount();
                return;
            }
            LOGGER.info("Table {} will be captured.", (Object)tableId);
            this.dispatcher.dispatchSchemaChangeEvent((Partition)this.partition, (OffsetContext)this.offsetContext, (DataCollectionId)tableId, (SchemaChangeEventEmitter)new OracleSchemaChangeEventEmitter(this.connectorConfig, this.partition, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), tableDdl, this.schema, Instant.now(), this.streamingMetrics, null));
            table = this.schema.tableFor(tableId);
            if (table == null) {
                return;
            }
        }
        HashMap<String, Object> oldChunkValues = new HashMap<String, Object>(0);
        if (chunkValues == null) {
            chunkValues = new HashMap<String, Object>(0);
        }
        for (Column column : this.schema.getLobColumnsForTable(table.id())) {
            oldChunkValues.put(column.name(), OracleValueConverters.UNAVAILABLE_VALUE);
            if (chunkValues.containsKey(column.name())) continue;
            LOGGER.trace("\tColumn '{}' not supplied, initialized with unavailable value", (Object)column.name());
            chunkValues.put(column.name(), OracleValueConverters.UNAVAILABLE_VALUE);
        }
        Object rowIdObject = lcr.getAttribute((Object)"ROW_ID");
        if (rowIdObject != null) {
            this.offsetContext.setRowId(rowIdObject.toString());
        }
        this.dispatcher.dispatchDataChangeEvent((Partition)this.partition, (DataCollectionId)tableId, (ChangeRecordEmitter)new XStreamChangeRecordEmitter(this.connectorConfig, this.partition, (OffsetContext)this.offsetContext, lcr, oldChunkValues, chunkValues, this.schema.tableFor(tableId), this.schema, this.clock));
    }

    private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Processing DDL event {}", (Object)ddlLcr.getDDLText());
        }
        TableId tableId = this.getTableId((LCR)ddlLcr);
        this.dispatcher.dispatchSchemaChangeEvent((Partition)this.partition, (OffsetContext)this.offsetContext, (DataCollectionId)tableId, (SchemaChangeEventEmitter)new OracleSchemaChangeEventEmitter(this.connectorConfig, this.partition, this.offsetContext, tableId, ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), ddlLcr.getDDLText(), this.schema, ddlLcr.getSourceTime().timestampValue().toInstant(), this.streamingMetrics, () -> this.processTruncateEvent(ddlLcr)));
    }

    private void processTruncateEvent(DDLLCR ddlLcr) {
        LOGGER.debug("Handling truncate event");
        DefaultRowLCR rowLCR = new DefaultRowLCR(ddlLcr.getSourceDatabaseName(), ddlLcr.getCommandType(), ddlLcr.getObjectOwner(), ddlLcr.getObjectName(), ddlLcr.getTransactionId(), ddlLcr.getTag(), ddlLcr.getPosition(), ddlLcr.getSourceTime());
        try {
            this.dispatchDataChangeEvent((RowLCR)rowLCR, null);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted", e);
        }
    }

    private TableId getTableId(LCR lcr) {
        if (!this.tablenameCaseInsensitive) {
            return new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName());
        }
        return new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName().toLowerCase());
    }

    private String getTableMetadataDdl(TableId tableId) throws OracleConnection.NonRelationalTableException {
        String string;
        LOGGER.info("Getting database metadata for table '{}'", (Object)tableId);
        String pdbName = this.connectorConfig.getPdbName();
        OracleConnection connection = new OracleConnection(this.connectorConfig);
        try {
            if (!Strings.isNullOrBlank((String)pdbName)) {
                connection.setSessionToPdb(pdbName);
            }
            connection.setAutoCommit(false);
            string = connection.getTableMetadataDdl(tableId);
        }
        catch (Throwable throwable) {
            try {
                try {
                    connection.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (SQLException e) {
                throw new DebeziumException("Failed to get table DDL metadata for: " + String.valueOf(tableId), (Throwable)e);
            }
        }
        connection.close();
        return string;
    }

    private void setWatermark() {
        if (this.eventSource.getXsOut() == null) {
            return;
        }
        try {
            XstreamStreamingChangeEventSource.PositionAndScn message = this.eventSource.receivePublishedPosition();
            if (message == null) {
                return;
            }
            LOGGER.debug("Recording offsets to Oracle");
            if (message.position != null) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Recording position {}", (Object)message.position);
                }
                this.eventSource.getXsOut().setProcessedLowWatermark(message.position.getRawPosition(), 0);
            } else if (message.scn != null) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Recording position with SCN {}", (Object)message.scn);
                }
                this.eventSource.getXsOut().setProcessedLowWatermark(message.scn, 0);
            } else {
                LOGGER.warn("Nothing in offsets could be recorded to Oracle");
                return;
            }
            LOGGER.trace("Offsets recorded to Oracle");
        }
        catch (StreamsException e) {
            throw new DebeziumException("Couldn't set processed low watermark", (Throwable)e);
        }
    }

    public void processChunk(ChunkColumnValue chunk) throws StreamsException {
        this.columnChunks.computeIfAbsent(chunk.getColumnName(), v -> new ChunkColumnValues()).add(chunk);
        if (chunk.isEndOfRow()) {
            this.resolveAndDispatchCurrentChunkedRow();
        }
    }

    public LCR createLCR() throws StreamsException {
        throw new UnsupportedOperationException("Should never be called");
    }

    public ChunkColumnValue createChunk() throws StreamsException {
        throw new UnsupportedOperationException("Should never be called");
    }

    private void resolveAndDispatchCurrentChunkedRow() {
        try {
            HashMap<String, Object> resolvedChunkValues = new HashMap<String, Object>();
            block8: for (Map.Entry<String, ChunkColumnValues> entry : this.columnChunks.entrySet()) {
                String columnName = entry.getKey();
                ChunkColumnValues chunkValues = entry.getValue();
                if (chunkValues.isEmpty()) {
                    LOGGER.trace("Column '{}' has no chunk values.", (Object)columnName);
                    continue;
                }
                int type = chunkValues.getChunkType();
                switch (type) {
                    case 1: 
                    case 3: {
                        resolvedChunkValues.put(columnName, chunkValues.getStringValue());
                        continue block8;
                    }
                    case 4: {
                        resolvedChunkValues.put(columnName, chunkValues.getXmlValue());
                        continue block8;
                    }
                    case 2: 
                    case 23: {
                        resolvedChunkValues.put(columnName, chunkValues.getByteArray());
                        continue block8;
                    }
                }
                LOGGER.trace("Received an unsupported chunk type '{}' for column '{}', ignored.", (Object)type, (Object)columnName);
            }
            this.columnChunks.clear();
            this.dispatchDataChangeEvent(this.currentRow, resolvedChunkValues);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            LOGGER.info("Received signal to stop, event loop will halt");
        }
        catch (SQLException e) {
            throw new DebeziumException("Failed to process chunk data", (Throwable)e);
        }
    }
}

