/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TransactionRolledBackException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ3166Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ3166Test.class);
    private BrokerService brokerService;
    private AtomicInteger sendAttempts = new AtomicInteger(0);

    @Test
    public void testCommitThroughAsyncErrorNoForceRollback() throws Exception {
        this.startBroker(false);
        Connection connection = this.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageProducer producer = session.createProducer((Destination)session.createQueue("QAT"));
        for (int i = 0; i < 10; ++i) {
            producer.send((Message)session.createTextMessage("Hello A"));
        }
        session.commit();
        Assert.assertTrue((String)"only one message made it through", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 1L;
            }
        }));
        connection.close();
    }

    @Test
    public void testCommitThroughAsyncErrorForceRollback() throws Exception {
        this.startBroker(true);
        Connection connection = this.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageProducer producer = session.createProducer((Destination)session.createQueue("QAT"));
        try {
            for (int i = 0; i < 10; ++i) {
                producer.send((Message)session.createTextMessage("Hello A"));
            }
            session.commit();
            Assert.fail((String)"Expect TransactionRolledBackException");
        }
        catch (JMSException expected) {
            Assert.assertTrue((boolean)(expected.getCause() instanceof XAException));
        }
        Assert.assertTrue((String)"only one message made it through", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 0L;
            }
        }));
        connection.close();
    }

    @Test
    public void testAckCommitThroughAsyncErrorForceRollback() throws Exception {
        this.startBroker(true);
        Connection connection = this.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        Queue destination = session.createQueue("QAT");
        MessageProducer producer = session.createProducer((Destination)destination);
        producer.send((Message)session.createTextMessage("Hello A"));
        producer.close();
        session.commit();
        MessageConsumer messageConsumer = session.createConsumer((Destination)destination);
        Assert.assertNotNull((String)"got message", (Object)messageConsumer.receive(4000L));
        try {
            session.commit();
            Assert.fail((String)"Expect TransactionRolledBackException");
        }
        catch (JMSException expected) {
            Assert.assertTrue((boolean)(expected.getCause() instanceof XAException));
            Assert.assertTrue((boolean)(expected.getCause().getCause() instanceof TransactionRolledBackException));
            Assert.assertTrue((boolean)(expected.getCause().getCause().getCause() instanceof RuntimeException));
        }
        Assert.assertTrue((String)"one message still there!", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalMessageCount() == 1L;
            }
        }));
        connection.close();
    }

    @Test
    public void testErrorOnSyncSend() throws Exception {
        this.startBroker(false);
        ActiveMQConnection connection = (ActiveMQConnection)this.createConnection();
        connection.setAlwaysSyncSend(true);
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageProducer producer = session.createProducer((Destination)session.createQueue("QAT"));
        try {
            for (int i = 0; i < 10; ++i) {
                producer.send((Message)session.createTextMessage("Hello A"));
            }
            session.commit();
        }
        catch (JMSException expectedSendFail) {
            LOG.info("Got expected: " + String.valueOf((Object)expectedSendFail));
            session.rollback();
        }
        Assert.assertTrue((String)"only one message made it through", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 0L;
            }
        }));
        connection.close();
    }

    @Test
    public void testRollbackOnAsyncErrorAmqApi() throws Exception {
        this.startBroker(false);
        ActiveMQConnection connection = (ActiveMQConnection)this.createConnection();
        connection.start();
        final ActiveMQSession session = (ActiveMQSession)connection.createSession(true, 0);
        int batchSize = 10;
        final CountDownLatch batchSent = new CountDownLatch(batchSize);
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer((Destination)session.createQueue("QAT"));
        for (int i = 0; i < batchSize; ++i) {
            producer.send((Message)session.createTextMessage("Hello A"), new AsyncCallback(){

                public void onSuccess() {
                    batchSent.countDown();
                }

                public void onException(JMSException e) {
                    session.getTransactionContext().setRollbackOnly(true);
                    batchSent.countDown();
                }
            });
            if (i != 0) continue;
            session.getTransactionContext().addSynchronization(new Synchronization(){

                public void beforeEnd() throws Exception {
                    if (!batchSent.await(10L, TimeUnit.SECONDS)) {
                        LOG.error("TimedOut waiting for aync send requests!");
                        session.getTransactionContext().setRollbackOnly(true);
                    }
                    super.beforeEnd();
                }
            });
        }
        try {
            session.commit();
            Assert.fail((String)"expect rollback on async error");
        }
        catch (TransactionRolledBackException transactionRolledBackException) {
            // empty catch block
        }
        Assert.assertTrue((String)"only one message made it through", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ3166Test.this.brokerService.getAdminView().getTotalEnqueueCount() == 0L;
            }
        }));
        connection.close();
    }

    private Connection createConnection() throws Exception {
        String connectionURI = ((TransportConnector)this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI);
        cf.setWatchTopicAdvisories(false);
        return cf.createConnection();
    }

    public void startBroker(boolean forceRollbackOnAsyncSendException) throws Exception {
        this.brokerService = this.createBroker(forceRollbackOnAsyncSendException);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
            this.brokerService = null;
        }
    }

    protected BrokerService createBroker(boolean forceRollbackOnAsyncSendException) throws Exception {
        BrokerService answer = new BrokerService();
        answer.setPersistent(true);
        answer.setDeleteAllMessagesOnStartup(true);
        answer.setAdvisorySupport(false);
        answer.setRollbackOnlyOnAsyncException(forceRollbackOnAsyncSendException);
        answer.addConnector("tcp://0.0.0.0:0");
        answer.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
                if (ack.isStandardAck()) {
                    throw new RuntimeException("no way, won't allow any standard ack");
                }
                super.acknowledge(consumerExchange, ack);
            }

            public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception {
                if (AMQ3166Test.this.sendAttempts.incrementAndGet() > 1) {
                    throw new RuntimeException("no way, won't accept any messages");
                }
                super.send(producerExchange, messageSend);
            }
        }});
        return answer;
    }
}

