/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonBlockingConsumerRedeliveryTest {
    private static final Logger LOG = LoggerFactory.getLogger(NonBlockingConsumerRedeliveryTest.class);
    private final String destinationName = "Destination";
    private final int MSG_COUNT = 100;
    private BrokerService broker;
    private String connectionUri;
    private ActiveMQConnectionFactory connectionFactory;

    @Test
    public void testMessageDeleiveredWhenNonBlockingEnabled() throws Exception {
        final LinkedHashSet received = new LinkedHashSet();
        LinkedHashSet beforeRollback = new LinkedHashSet();
        LinkedHashSet afterRollback = new LinkedHashSet();
        Connection connection = this.connectionFactory.createConnection();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.add(message);
            }
        });
        this.sendMessages();
        session.commit();
        connection.start();
        Assert.assertTrue((String)"Pre-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages.");
                return received.size() == 100;
            }
        }));
        beforeRollback.addAll(received);
        received.clear();
        session.rollback();
        Assert.assertTrue((String)"Post-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages since rollback.");
                return received.size() == 100;
            }
        }));
        afterRollback.addAll(received);
        received.clear();
        Assert.assertEquals((long)beforeRollback.size(), (long)afterRollback.size());
        Assert.assertEquals(beforeRollback, afterRollback);
        session.commit();
    }

    @Test
    public void testMessageDeleiveredInCorrectOrder() throws Exception {
        final LinkedHashSet received = new LinkedHashSet();
        LinkedHashSet beforeRollback = new LinkedHashSet();
        LinkedHashSet afterRollback = new LinkedHashSet();
        Connection connection = this.connectionFactory.createConnection();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.add(message);
            }
        });
        this.sendMessages();
        session.commit();
        connection.start();
        Assert.assertTrue((String)"Pre-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages.");
                return received.size() == 100;
            }
        }));
        beforeRollback.addAll(received);
        received.clear();
        session.rollback();
        Assert.assertTrue((String)"Post-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages since rollback.");
                return received.size() == 100;
            }
        }));
        afterRollback.addAll(received);
        received.clear();
        Assert.assertEquals((long)beforeRollback.size(), (long)afterRollback.size());
        Assert.assertEquals(beforeRollback, afterRollback);
        Iterator after = afterRollback.iterator();
        Iterator before = beforeRollback.iterator();
        while (before.hasNext() && after.hasNext()) {
            TextMessage original = (TextMessage)before.next();
            TextMessage rolledBack = (TextMessage)after.next();
            int originalInt = Integer.parseInt(original.getText());
            int rolledbackInt = Integer.parseInt(rolledBack.getText());
            Assert.assertEquals((long)originalInt, (long)rolledbackInt);
        }
        session.commit();
    }

    @Test
    public void testMessageDeleiveryDoesntStop() throws Exception {
        final LinkedHashSet received = new LinkedHashSet();
        LinkedHashSet beforeRollback = new LinkedHashSet();
        LinkedHashSet afterRollback = new LinkedHashSet();
        Connection connection = this.connectionFactory.createConnection();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.add(message);
            }
        });
        this.sendMessages();
        connection.start();
        Assert.assertTrue((String)"Pre-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages.");
                return received.size() == 100;
            }
        }));
        beforeRollback.addAll(received);
        received.clear();
        session.rollback();
        this.sendMessages();
        Assert.assertTrue((String)"Post-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages since rollback.");
                return received.size() == 200;
            }
        }));
        afterRollback.addAll(received);
        received.clear();
        Assert.assertEquals((long)(beforeRollback.size() * 2), (long)afterRollback.size());
        session.commit();
    }

    @Test
    public void testNonBlockingMessageDeleiveryIsDelayed() throws Exception {
        final LinkedHashSet received = new LinkedHashSet();
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        connection.getRedeliveryPolicy().setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(6L));
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.add(message);
            }
        });
        this.sendMessages();
        connection.start();
        Assert.assertTrue((String)"Pre-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages.");
                return received.size() == 100;
            }
        }));
        received.clear();
        session.rollback();
        Assert.assertFalse((String)"Delayed redelivery test not expecting any messages yet.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return received.size() > 0;
            }
        }, (long)TimeUnit.SECONDS.toMillis(4L)));
        session.commit();
        session.close();
    }

    @Test
    public void testNonBlockingMessageDeleiveryWithRollbacks() throws Exception {
        final LinkedHashSet received = new LinkedHashSet();
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        final Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.add(message);
            }
        });
        this.sendMessages();
        connection.start();
        Assert.assertTrue((String)"Pre-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages.");
                return received.size() == 100;
            }
        }));
        received.clear();
        consumer.setMessageListener(new MessageListener(){
            int count = 0;

            public void onMessage(Message message) {
                if (++this.count > 10) {
                    try {
                        session.rollback();
                        LOG.info("Rolling back session.");
                        this.count = 0;
                    }
                    catch (JMSException e) {
                        LOG.warn("Caught an unexcepted exception: " + e.getMessage());
                    }
                } else {
                    received.add(message);
                    try {
                        session.commit();
                    }
                    catch (JMSException e) {
                        LOG.warn("Caught an unexcepted exception: " + e.getMessage());
                    }
                }
            }
        });
        session.rollback();
        Assert.assertTrue((String)"Post-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages since rollback.");
                return received.size() == 100;
            }
        }));
        Assert.assertEquals((long)100L, (long)received.size());
        session.commit();
    }

    @Test
    public void testNonBlockingMessageDeleiveryWithAllRolledBack() throws Exception {
        final LinkedHashSet received = new LinkedHashSet();
        final LinkedHashSet dlqed = new LinkedHashSet();
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        connection.getRedeliveryPolicy().setMaximumRedeliveries(5);
        final Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("Destination");
        Queue dlq = session.createQueue("ActiveMQ.DLQ");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        MessageConsumer dlqConsumer = session.createConsumer((Destination)dlq);
        dlqConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                dlqed.add(message);
            }
        });
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                received.add(message);
            }
        });
        this.sendMessages();
        connection.start();
        Assert.assertTrue((String)"Pre-Rollback expects to receive: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + received.size() + " messages.");
                return received.size() == 100;
            }
        }));
        session.rollback();
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    session.rollback();
                }
                catch (JMSException e) {
                    LOG.warn("Caught an unexcepted exception: " + e.getMessage());
                }
            }
        });
        Assert.assertTrue((String)"Post-Rollback expects to DLQ: 100 messages.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Consumer has received " + dlqed.size() + " messages in DLQ.");
                return dlqed.size() == 100;
            }
        }));
        session.commit();
    }

    private void sendMessages() throws Exception {
        Connection connection = this.connectionFactory.createConnection();
        Session session = connection.createSession(false, 1);
        Queue destination = session.createQueue("Destination");
        MessageProducer producer = session.createProducer((Destination)destination);
        for (int i = 0; i < 100; ++i) {
            producer.send((Message)session.createTextMessage("" + i));
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(false);
        this.broker.setUseJmx(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        this.connectionFactory.setNonBlockingRedelivery(true);
        RedeliveryPolicy policy = this.connectionFactory.getRedeliveryPolicy();
        policy.setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(2L));
        policy.setBackOffMultiplier(-1.0);
        policy.setRedeliveryDelay(TimeUnit.SECONDS.toMillis(2L));
        policy.setMaximumRedeliveryDelay(-1L);
        policy.setUseExponentialBackOff(false);
        policy.setMaximumRedeliveries(-1);
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }
}

