/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer.processor.infinispan;

import io.debezium.DebeziumException;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanLogMinerCache;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanTransaction;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import java.util.Map;
import java.util.Objects;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedInfinispanLogMinerEventProcessor
extends AbstractInfinispanLogMinerEventProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedInfinispanLogMinerEventProcessor.class);
    private final EmbeddedCacheManager cacheManager;
    private final boolean dropBufferOnStop;
    private final LogMinerCache<String, InfinispanTransaction> transactionCache;
    private final LogMinerCache<String, LogMinerEvent> eventCache;
    private final LogMinerCache<String, String> processedTransactionsCache;
    private final LogMinerCache<String, String> schemaChangesCache;

    public EmbeddedInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, EventDispatcher<OraclePartition, TableId> dispatcher, OraclePartition partition, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, LogMinerStreamingChangeEventSourceMetrics metrics) {
        super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
        LOGGER.info("Using Infinispan in embedded mode.");
        this.cacheManager = new DefaultCacheManager(this.parseAndGetGlobalConfiguration(connectorConfig));
        this.dropBufferOnStop = connectorConfig.isLogMiningBufferDropOnStop();
        this.transactionCache = this.createCache("transactions", connectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
        this.processedTransactionsCache = this.createCache("processed-transactions", connectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_PROCESSED_TRANSACTIONS);
        this.schemaChangesCache = this.createCache("schema-changes", connectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
        this.eventCache = this.createCache("events", connectorConfig, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
        this.reCreateInMemoryCache();
        this.displayCacheStatistics();
    }

    @Override
    public void close() throws Exception {
        if (this.dropBufferOnStop) {
            LOGGER.info("Clearing infinispan caches");
            this.transactionCache.clear();
            this.eventCache.clear();
            this.schemaChangesCache.clear();
            this.processedTransactionsCache.clear();
            this.cacheManager.administration().removeCache("transactions");
            this.cacheManager.administration().removeCache("processed-transactions");
            this.cacheManager.administration().removeCache("schema-changes");
            this.cacheManager.administration().removeCache("events");
        }
        LOGGER.info("Shutting down infinispan embedded caches");
        this.cacheManager.close();
    }

    @Override
    public LogMinerCache<String, InfinispanTransaction> getTransactionCache() {
        return this.transactionCache;
    }

    @Override
    public LogMinerCache<String, LogMinerEvent> getEventCache() {
        return this.eventCache;
    }

    @Override
    public LogMinerCache<String, String> getSchemaChangesCache() {
        return this.schemaChangesCache;
    }

    @Override
    public LogMinerCache<String, String> getProcessedTransactionsCache() {
        return this.processedTransactionsCache;
    }

    private <K, V> LogMinerCache<K, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
        Objects.requireNonNull(cacheName);
        String cacheConfiguration = connectorConfig.getConfig().getString(field);
        Objects.requireNonNull(cacheConfiguration);
        this.cacheManager.defineConfiguration(cacheName, this.parseAndGetConfiguration(cacheName, cacheConfiguration));
        return new InfinispanLogMinerCache(this.cacheManager.getCache(cacheName));
    }

    private Configuration parseAndGetConfiguration(String cacheName, String configuration) {
        ConfigurationBuilderHolder builderHolder = new ParserRegistry().parse(configuration);
        Map builders = builderHolder.getNamedConfigurationBuilders();
        if (builders.size() > 1) {
            throw new DebeziumException("Infinispan cache configuration for '" + cacheName + "' contains multiple cache configurations and should only contain one.");
        }
        if (builders.isEmpty()) {
            throw new DebeziumException("Infinispan cache configuration for '" + cacheName + "' contained no valid cache configuration. Please check your connector configuration");
        }
        return ((ConfigurationBuilder)builders.values().iterator().next()).build();
    }

    private GlobalConfiguration parseAndGetGlobalConfiguration(OracleConnectorConfig connectorConfig) {
        String globalCacheConfiguration = connectorConfig.getLogMiningInifispanGlobalConfiguration();
        if (globalCacheConfiguration == null) {
            return new GlobalConfigurationBuilder().build();
        }
        ConfigurationBuilderHolder builderHolder = new ParserRegistry().parse(globalCacheConfiguration);
        return builderHolder.getGlobalConfigurationBuilder().build();
    }
}

