/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageListenerRedeliveryTest {
    private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class);
    @Rule
    public TestName name = new TestName();
    private Connection connection;

    @Before
    public void setUp() throws Exception {
        this.connection = this.createConnection();
    }

    @After
    public void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
    }

    protected String getTestName() {
        return this.name.getMethodName();
    }

    protected RedeliveryPolicy getRedeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setMaximumRedeliveries(3);
        redeliveryPolicy.setBackOffMultiplier(2.0);
        redeliveryPolicy.setUseExponentialBackOff(true);
        return redeliveryPolicy;
    }

    protected Connection createConnection() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&marshal=true");
        factory.setRedeliveryPolicy(this.getRedeliveryPolicy());
        return factory.createConnection();
    }

    @Test(timeout=60000L)
    public void testQueueRollbackConsumerListener() throws JMSException {
        this.connection.start();
        Session session = this.connection.createSession(true, 2);
        Queue queue = session.createQueue("queue-" + this.getTestName());
        MessageProducer producer = this.createProducer(session, (Destination)queue);
        TextMessage message = this.createTextMessage(session);
        producer.send((Message)message);
        session.commit();
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer;
        mc.setRedeliveryPolicy(this.getRedeliveryPolicy());
        TestMessageListener listener = new TestMessageListener(session);
        consumer.setMessageListener((MessageListener)listener);
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)2L, (long)listener.counter);
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)3L, (long)listener.counter);
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)4L, (long)listener.counter);
        producer.send((Message)this.createTextMessage(session));
        session.commit();
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)5L, (long)listener.counter);
        try {
            Thread.sleep(1500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)5L, (long)listener.counter);
        session.close();
    }

    @Test(timeout=60000L)
    public void testQueueRollbackSessionListener() throws JMSException {
        this.connection.start();
        Session session = this.connection.createSession(true, 2);
        Queue queue = session.createQueue("queue-" + this.getTestName());
        MessageProducer producer = this.createProducer(session, (Destination)queue);
        TextMessage message = this.createTextMessage(session);
        producer.send((Message)message);
        session.commit();
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer;
        mc.setRedeliveryPolicy(this.getRedeliveryPolicy());
        TestMessageListener listener = new TestMessageListener(session);
        consumer.setMessageListener((MessageListener)listener);
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)2L, (long)listener.counter);
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)3L, (long)listener.counter);
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)4L, (long)listener.counter);
        producer.send((Message)this.createTextMessage(session));
        session.commit();
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)5L, (long)listener.counter);
        try {
            Thread.sleep(1500L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)5L, (long)listener.counter);
        session.close();
    }

    @Test(timeout=60000L)
    public void testQueueSessionListenerExceptionRetry() throws Exception {
        int i;
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("queue-" + this.getTestName());
        MessageProducer producer = this.createProducer(session, (Destination)queue);
        TextMessage message = this.createTextMessage(session, "1");
        producer.send((Message)message);
        message = this.createTextMessage(session, "2");
        producer.send((Message)message);
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        final CountDownLatch gotMessage = new CountDownLatch(2);
        final AtomicInteger count = new AtomicInteger(0);
        final int maxDeliveries = this.getRedeliveryPolicy().getMaximumRedeliveries();
        final ArrayList received = new ArrayList();
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                LOG.info("Message Received: " + String.valueOf(message));
                try {
                    received.add(((TextMessage)message).getText());
                }
                catch (JMSException e) {
                    e.printStackTrace();
                    Assert.fail((String)e.toString());
                }
                if (count.incrementAndGet() < maxDeliveries) {
                    throw new RuntimeException(MessageListenerRedeliveryTest.this.getTestName() + " force a redelivery");
                }
                count.set(0);
                gotMessage.countDown();
            }
        });
        Assert.assertTrue((String)"got message before retry expiry", (boolean)gotMessage.await(20L, TimeUnit.SECONDS));
        for (i = 0; i < maxDeliveries; ++i) {
            Assert.assertEquals((String)("got first redelivered: " + i), (Object)"1", received.get(i));
        }
        for (i = maxDeliveries; i < maxDeliveries * 2; ++i) {
            Assert.assertEquals((String)("got first redelivered: " + i), (Object)"2", received.get(i));
        }
        session.close();
    }

    @Test(timeout=60000L)
    public void testQueueSessionListenerExceptionDlq() throws Exception {
        this.connection.start();
        Session session = this.connection.createSession(false, 1);
        Queue queue = session.createQueue("queue-" + this.getTestName());
        MessageProducer producer = this.createProducer(session, (Destination)queue);
        TextMessage message = this.createTextMessage(session);
        producer.send((Message)message);
        final Message[] dlqMessage = new Message[1];
        ActiveMQQueue dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
        MessageConsumer dlqConsumer = session.createConsumer((Destination)dlqDestination);
        final CountDownLatch gotDlqMessage = new CountDownLatch(1);
        dlqConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                LOG.info("DLQ Message Received: " + String.valueOf(message));
                dlqMessage[0] = message;
                gotDlqMessage.countDown();
            }
        });
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        int maxDeliveries = this.getRedeliveryPolicy().getMaximumRedeliveries();
        final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                LOG.info("Message Received: " + String.valueOf(message));
                gotMessage.countDown();
                throw new RuntimeException(MessageListenerRedeliveryTest.this.getTestName() + " force a redelivery");
            }
        });
        Assert.assertTrue((String)"got message before retry expiry", (boolean)gotMessage.await(20L, TimeUnit.SECONDS));
        Assert.assertTrue((String)"got dlq message", (boolean)gotDlqMessage.await(20L, TimeUnit.SECONDS));
        message = dlqMessage[0];
        Assert.assertNotNull((String)"dlq message captured", (Object)message);
        String cause = message.getStringProperty("dlqDeliveryFailureCause");
        LOG.info("DLQ'd message cause reported as: {}", (Object)cause);
        Assert.assertTrue((String)"cause 'cause' exception is remembered", (boolean)cause.contains("RuntimeException"));
        Assert.assertTrue((String)"is correct exception", (boolean)cause.contains(this.getTestName()));
        Assert.assertTrue((String)"cause exception is remembered", (boolean)cause.contains("Throwable"));
        Assert.assertTrue((String)"cause policy is remembered", (boolean)cause.contains("RedeliveryPolicy"));
        Assert.assertTrue((String)"cause redelivered count is remembered", (boolean)cause.contains("[" + (maxDeliveries + 1) + "]"));
        session.close();
    }

    @Test(timeout=60000L)
    public void testTransactedQueueSessionListenerExceptionDlq() throws Exception {
        this.connection.start();
        final Session session = this.connection.createSession(true, 0);
        Queue queue = session.createQueue("queue-" + this.getTestName());
        MessageProducer producer = this.createProducer(session, (Destination)queue);
        TextMessage message = this.createTextMessage(session);
        producer.send((Message)message);
        session.commit();
        final Message[] dlqMessage = new Message[1];
        ActiveMQQueue dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
        MessageConsumer dlqConsumer = session.createConsumer((Destination)dlqDestination);
        final CountDownLatch gotDlqMessage = new CountDownLatch(1);
        dlqConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                LOG.info("DLQ Message Received: " + String.valueOf(message));
                dlqMessage[0] = message;
                gotDlqMessage.countDown();
            }
        });
        MessageConsumer consumer = session.createConsumer((Destination)queue);
        int maxDeliveries = this.getRedeliveryPolicy().getMaximumRedeliveries();
        final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                LOG.info("Message Received: " + String.valueOf(message));
                gotMessage.countDown();
                try {
                    session.rollback();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
                throw new RuntimeException(MessageListenerRedeliveryTest.this.getTestName() + " force a redelivery");
            }
        });
        Assert.assertTrue((String)"got message before retry expiry", (boolean)gotMessage.await(20L, TimeUnit.SECONDS));
        Assert.assertTrue((String)"got dlq message", (boolean)gotDlqMessage.await(20L, TimeUnit.SECONDS));
        message = dlqMessage[0];
        Assert.assertNotNull((String)"dlq message captured", (Object)message);
        String cause = message.getStringProperty("dlqDeliveryFailureCause");
        LOG.info("DLQ'd message cause reported as: {}", (Object)cause);
        Assert.assertTrue((String)"cause 'cause' exception is remembered", (boolean)cause.contains("RuntimeException"));
        Assert.assertTrue((String)"is correct exception", (boolean)cause.contains(this.getTestName()));
        Assert.assertTrue((String)"cause exception is remembered", (boolean)cause.contains("Throwable"));
        Assert.assertTrue((String)"cause policy is remembered", (boolean)cause.contains("RedeliveryPolicy"));
        session.close();
    }

    private TextMessage createTextMessage(Session session, String text) throws JMSException {
        return session.createTextMessage(text);
    }

    private TextMessage createTextMessage(Session session) throws JMSException {
        return session.createTextMessage("Hello");
    }

    private MessageProducer createProducer(Session session, Destination queue) throws JMSException {
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(this.getDeliveryMode());
        return producer;
    }

    protected int getDeliveryMode() {
        return 2;
    }

    private class TestMessageListener
    implements MessageListener {
        public int counter;
        private final Session session;

        public TestMessageListener(Session session) {
            this.session = session;
        }

        public void onMessage(Message message) {
            try {
                LOG.info("Message Received: " + String.valueOf(message));
                ++this.counter;
                if (this.counter <= 4) {
                    LOG.info("Message Rollback.");
                    this.session.rollback();
                } else {
                    LOG.info("Message Commit.");
                    message.acknowledge();
                    this.session.commit();
                }
            }
            catch (JMSException e) {
                LOG.error("Error when rolling back transaction");
            }
        }
    }
}

