/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TrapMessageInJDBCStoreTest
extends TestCase {
    private static final String MY_TEST_Q = "MY_TEST_Q";
    private static final Logger LOG = LoggerFactory.getLogger(TrapMessageInJDBCStoreTest.class);
    private String transportUrl = "tcp://127.0.0.1:0";
    private BrokerService broker;
    private TestTransactionContext testTransactionContext;
    private TestJDBCPersistenceAdapter jdbc;
    private Connection checkOnStoreConnection;

    protected BrokerService createBroker(boolean withJMX) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(withJMX);
        EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource)DataSourceServiceSupport.createDataSource((String)IOHelper.getDefaultDataDirectory());
        this.checkOnStoreConnection = embeddedDataSource.getConnection();
        this.jdbc = new TestJDBCPersistenceAdapter();
        this.jdbc.setDataSource((DataSource)embeddedDataSource);
        this.jdbc.setCleanupPeriod(0);
        this.testTransactionContext = new TestTransactionContext(this.jdbc);
        this.jdbc.setLockKeepAlivePeriod(1000L);
        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
        leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
        this.jdbc.setLocker((Locker)leaseDatabaseLocker);
        broker.setPersistenceAdapter((PersistenceAdapter)this.jdbc);
        broker.setIoExceptionHandler((IOExceptionHandler)new LeaseLockerIOExceptionHandler());
        this.transportUrl = broker.addConnector(this.transportUrl).getPublishableConnectString();
        return broker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testDBCommitException() throws Exception {
        org.apache.logging.log4j.core.Logger serviceLogger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger((String)(TransportConnection.class.getName() + ".Service")));
        serviceLogger.setLevel(Level.TRACE);
        this.broker = this.createBroker(false);
        this.broker.deleteAllMessages();
        this.broker.start();
        this.broker.waitUntilStarted();
        try {
            LOG.info("***Broker started...");
            String failoverTransportURL = "failover:(" + this.transportUrl + ")?timeout=5000";
            this.sendMessage(MY_TEST_Q, failoverTransportURL);
            ArrayList<Long> dbSeq = this.dbMessageCount(this.checkOnStoreConnection);
            LOG.info("*** after send: db contains message seq " + String.valueOf(dbSeq));
            List<TextMessage> consumedMessages = this.consumeMessages(MY_TEST_Q, failoverTransportURL);
            TrapMessageInJDBCStoreTest.assertEquals((String)"number of consumed messages", (int)3, (int)consumedMessages.size());
            dbSeq = this.dbMessageCount(this.checkOnStoreConnection);
            LOG.info("*** after consume - db contains message seq " + String.valueOf(dbSeq));
            TrapMessageInJDBCStoreTest.assertEquals((String)"number of messages in DB after test", (int)0, (int)dbSeq.size());
        }
        finally {
            try {
                this.checkOnStoreConnection.close();
            }
            catch (Exception exception) {}
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    public List<TextMessage> consumeMessages(String queue, String transportURL) throws JMSException {
        LOG.debug("*** consumeMessages() called ...");
        try (jakarta.jms.Connection connection = null;){
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
            connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, 1);
            Queue destination = session.createQueue(queue);
            ArrayList<TextMessage> consumedMessages = new ArrayList<TextMessage>();
            MessageConsumer messageConsumer = session.createConsumer((Destination)destination);
            while (true) {
                TextMessage textMessage = (TextMessage)messageConsumer.receive(4000L);
                LOG.debug("*** consumed Messages :" + String.valueOf(textMessage));
                if (textMessage == null) {
                    ArrayList<TextMessage> arrayList = consumedMessages;
                    return arrayList;
                }
                consumedMessages.add(textMessage);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendMessage(String queue, String transportURL) throws Exception {
        try (jakarta.jms.Connection connection = null;){
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
            connection = factory.createConnection();
            Session session = connection.createSession(false, 1);
            Queue destination = session.createQueue(queue);
            MessageProducer producer = session.createProducer((Destination)destination);
            producer.setDeliveryMode(2);
            TextMessage m = session.createTextMessage("1");
            LOG.debug("*** send message 1 to broker...");
            producer.send((Message)m);
            LOG.debug("***  send message 2 to broker");
            m.setText("2");
            producer.send((Message)m);
            ArrayList<Long> dbSeq = this.dbMessageCount(this.checkOnStoreConnection);
            LOG.info("*** after send 2 - db contains message seq " + String.valueOf(dbSeq));
            TrapMessageInJDBCStoreTest.assertEquals((String)"number of messages in DB after send 2", (int)2, (int)dbSeq.size());
            LOG.debug("***  send  message 3 to broker");
            m.setText("3");
            producer.send((Message)m);
            LOG.debug("*** Finished sending messages to broker");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ArrayList<Long> dbMessageCount(Connection checkOnStoreConnection) throws SQLException, IOException {
        try (PreparedStatement statement = checkOnStoreConnection.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS");){
            ResultSet result = statement.executeQuery();
            ArrayList<Long> dbSeq = new ArrayList<Long>();
            while (result.next()) {
                dbSeq.add(result.getLong(1));
            }
            ArrayList<Long> arrayList = dbSeq;
            return arrayList;
        }
    }

    public class TestJDBCPersistenceAdapter
    extends JDBCPersistenceAdapter {
        public TransactionContext getTransactionContext() throws IOException {
            return TrapMessageInJDBCStoreTest.this.testTransactionContext;
        }
    }

    public class TestTransactionContext
    extends TransactionContext {
        private int count;

        public TestTransactionContext(JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException {
            super(jdbcPersistenceAdapter, -1, -1);
        }

        public void executeBatch() throws SQLException {
            super.executeBatch();
            ++this.count;
            LOG.debug("ExecuteBatchOverride: count:" + this.count, (Throwable)new RuntimeException("executeBatch"));
            if (this.count == 16) {
                throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + this.count);
            }
        }
    }
}

