/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.signal;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.actions.SignalAction;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.signal.channels.SourceSignalChannel;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.AbstractPartition;
import io.debezium.util.Threads;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SignalProcessor<P extends Partition, O extends OffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SignalProcessor.class);
    public static final int SEMAPHORE_WAIT_TIME = 10;
    public static final String DATA_COLLECTIONS_FIELD_NAME = "data-collections";
    public static final String POINT_REGEX = "\\.";
    private final Map<String, SignalAction<P>> signalActions = new HashMap<String, SignalAction<P>>();
    private final CommonConnectorConfig connectorConfig;
    private final List<SignalChannelReader> enabledChannelReaders;
    private final List<SignalChannelReader> signalChannelReaders;
    private final ScheduledExecutorService signalProcessorExecutor;
    private final DocumentReader documentReader;
    private final Map<P, O> partitionOffsets = new ConcurrentHashMap<P, O>();
    private final Semaphore semaphore = new Semaphore(1);

    public SignalProcessor(Class<? extends SourceConnector> connector, CommonConnectorConfig config, Map<String, SignalAction<P>> signalActions, List<SignalChannelReader> signalChannelReaders, DocumentReader documentReader, Offsets<P, O> previousOffsets) {
        this.connectorConfig = config;
        this.signalChannelReaders = signalChannelReaders;
        this.documentReader = documentReader;
        if (previousOffsets != null) {
            previousOffsets.getOffsets().entrySet().stream().filter(entry -> entry.getValue() != null).forEach(entry -> this.partitionOffsets.put((Partition)entry.getKey(), (OffsetContext)entry.getValue()));
        }
        this.signalProcessorExecutor = Threads.newSingleThreadScheduledExecutor(connector, config.getLogicalName(), SignalProcessor.class.getSimpleName(), false);
        this.enabledChannelReaders = this.getEnabledChannelReaders();
        this.enabledChannelReaders.forEach(signalChannelReader -> signalChannelReader.init(this.connectorConfig));
        this.signalActions.putAll(signalActions);
    }

    private Predicate<SignalChannelReader> isEnabled() {
        return reader -> this.connectorConfig.getEnabledChannels().contains(reader.name());
    }

    private List<SignalChannelReader> getEnabledChannelReaders() {
        return this.signalChannelReaders.stream().filter(this.isEnabled()).collect(Collectors.toList());
    }

    public void setContext(Offsets<P, O> offsets) {
        this.partitionOffsets.clear();
        if (offsets != null) {
            offsets.getOffsets().entrySet().stream().filter(entry -> entry.getValue() != null).forEach(entry -> this.partitionOffsets.put((Partition)entry.getKey(), (OffsetContext)entry.getValue()));
        }
        LOGGER.debug("Updated offset contexts for {} partition(s)", (Object)this.partitionOffsets.size());
    }

    public void start() {
        LOGGER.info("SignalProcessor started. Scheduling it every {}ms", (Object)this.connectorConfig.getSignalPollInterval().toMillis());
        this.signalProcessorExecutor.scheduleAtFixedRate(this::process, 0L, this.connectorConfig.getSignalPollInterval().toMillis(), TimeUnit.MILLISECONDS);
    }

    public void stop() throws InterruptedException {
        this.signalProcessorExecutor.submit(() -> this.enabledChannelReaders.forEach(SignalChannelReader::close));
        this.signalProcessorExecutor.shutdown();
        boolean isShutdown = this.signalProcessorExecutor.awaitTermination(this.connectorConfig.getExecutorShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS);
        if (!isShutdown) {
            LOGGER.warn("SignalProcessor didn't stop in the expected time, shutting down executor now");
            Thread.interrupted();
            this.signalProcessorExecutor.shutdownNow();
            this.signalProcessorExecutor.awaitTermination(this.connectorConfig.getExecutorShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS);
        }
        LOGGER.info("SignalProcessor stopped");
    }

    public void registerSignalAction(String id, SignalAction<P> signal) {
        LOGGER.debug("Registering signal '{}' using class '{}'", (Object)id, (Object)signal.getClass().getName());
        this.signalActions.put(id, signal);
    }

    public void process() {
        this.executeWithSemaphore(() -> {
            LOGGER.trace("SignalProcessor processing");
            this.enabledChannelReaders.stream().map(SignalChannelReader::read).flatMap(Collection::stream).forEach(signalRecord -> this.processSignal((SignalRecord)signalRecord, null));
        });
    }

    public void processSourceSignal(P partition) {
        this.executeWithSemaphore(() -> {
            LOGGER.trace("Processing source signals for partition {}", (Object)partition);
            this.enabledChannelReaders.stream().filter(SignalProcessor.isSignal(SourceSignalChannel.class)).map(SignalChannelReader::read).flatMap(Collection::stream).forEach(signalRecord -> this.processSignal((SignalRecord)signalRecord, partition));
        });
    }

    public <T extends SignalChannelReader> T getSignalChannel(Class<T> channel) {
        return (T)((SignalChannelReader)channel.cast(this.signalChannelReaders.stream().filter(SignalProcessor.isSignal(channel)).findFirst().get()));
    }

    private void executeWithSemaphore(Runnable operation) {
        boolean acquired = false;
        try {
            acquired = this.semaphore.tryAcquire(10L, TimeUnit.SECONDS);
            operation.run();
        }
        catch (InterruptedException e) {
            LOGGER.error("Not able to acquire semaphore after {}s", (Object)10);
            throw new DebeziumException("Not able to acquire semaphore during signaling processing", (Throwable)e);
        }
        finally {
            if (acquired) {
                this.semaphore.release();
            }
        }
    }

    private void processSignal(SignalRecord signalRecord, P knownPartition) {
        LOGGER.debug("Signal Processor partition offsets: {}", this.partitionOffsets.keySet());
        LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", new Object[]{signalRecord.getId(), signalRecord.getType(), signalRecord.getData()});
        SignalAction<P> action = this.signalActions.get(signalRecord.getType());
        if (action == null) {
            LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", (Object)signalRecord.getId(), (Object)signalRecord.getType());
            return;
        }
        try {
            Document jsonData = signalRecord.getData() == null || signalRecord.getData().isEmpty() ? Document.create() : this.documentReader.read(signalRecord.getData());
            Offsets<P, O> finalOffsets = this.getOffsets(knownPartition, jsonData);
            for (Map.Entry<P, O> entry : finalOffsets.getOffsets().entrySet()) {
                this.executeSignal(action, signalRecord, jsonData, (Partition)entry.getKey(), (OffsetContext)entry.getValue());
            }
        }
        catch (IOException e) {
            LOGGER.warn("Signal '{}' has been received but the data '{}' cannot be parsed", new Object[]{signalRecord.getId(), signalRecord.getData(), e});
        }
        catch (InterruptedException e) {
            LOGGER.warn("Action {} has been interrupted. The signal {} may not have been processed.", (Object)signalRecord.getType(), (Object)signalRecord);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            LOGGER.warn("Action {} failed. The signal {} may not have been processed.", new Object[]{signalRecord.getType(), signalRecord, e});
        }
    }

    private Offsets<P, O> getOffsets(P knownPartition, Document jsonData) {
        if (knownPartition != null) {
            return Offsets.of(knownPartition, (OffsetContext)this.partitionOffsets.get(knownPartition));
        }
        if (this.partitionOffsets.size() == 1) {
            return Offsets.of(this.partitionOffsets);
        }
        Optional<P> targetPartition = this.extractPartitionFromData(jsonData);
        if (targetPartition.isPresent()) {
            OffsetContext offset = (OffsetContext)this.partitionOffsets.get(targetPartition.get());
            if (offset != null) {
                return Offsets.of((Partition)targetPartition.get(), offset);
            }
            LOGGER.warn("Signal references partition {} which is not managed by this task. Available partitions: {}", targetPartition, this.partitionOffsets.keySet());
            return Offsets.of(Map.of());
        }
        return Offsets.of(this.partitionOffsets);
    }

    private Optional<P> extractPartitionFromData(Document jsonData) {
        Array dataCollectionsArray = jsonData.getArray(DATA_COLLECTIONS_FIELD_NAME);
        if (dataCollectionsArray == null || dataCollectionsArray.isEmpty()) {
            LOGGER.debug("No data-collections found in signal data");
            return Optional.empty();
        }
        for (Array.Entry entry : dataCollectionsArray) {
            String dataCollection = entry.getValue().asString().trim();
            String[] parts = dataCollection.split(POINT_REGEX);
            if (parts.length >= 2) {
                String databaseName = parts[0];
                for (Partition partition : this.partitionOffsets.keySet()) {
                    if (!this.matchesDatabase(partition, databaseName)) continue;
                    LOGGER.debug("Matched data collection '{}' to partition {}", (Object)dataCollection, (Object)partition);
                    return Optional.of(partition);
                }
                continue;
            }
            LOGGER.debug("Data collection '{}' does not contain database qualifier", (Object)dataCollection);
        }
        return Optional.empty();
    }

    private boolean matchesDatabase(P partition, String databaseName) {
        if (partition instanceof AbstractPartition) {
            Map<String, String> loggingContext = partition.getLoggingContext();
            String partitionDatabaseName = loggingContext.get("dbz.databaseName");
            return databaseName.equalsIgnoreCase(partitionDatabaseName);
        }
        return false;
    }

    private void executeSignal(SignalAction<P> action, SignalRecord signalRecord, Document jsonData, P partition, O offset) throws InterruptedException {
        SignalPayload<P> payload = new SignalPayload<P>(partition, signalRecord.getId(), signalRecord.getType(), jsonData, (OffsetContext)offset, signalRecord.getAdditionalData());
        action.arrived(payload);
    }

    private static <T extends SignalChannelReader> Predicate<SignalChannelReader> isSignal(Class<T> channelClass) {
        return channel -> channel.getClass().equals(channelClass);
    }
}

