/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.net.URI;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ1853Test {
    private static BrokerService broker;
    private static final Logger LOG;
    static final String jmsConnectionURI = "failover:(vm://localhost)";
    private static final String queueFail = "Queue.BlockingConsumer.QueueFail";
    private final int producerMessages = 5;
    private final int totalNumberMessages = 10;
    private final int maxRedeliveries = 2;
    private final int redeliveryDelay = 1000;
    private Map<String, AtomicInteger> messageList = null;

    @Before
    public void setUp() throws Exception {
        broker = BrokerFactory.createBroker((URI)new URI("broker:()/localhost?persistent=false"));
        broker.setUseJmx(false);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.start();
        broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (broker != null) {
            broker.stop();
            broker.waitUntilStopped();
            broker = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumerMessagesAreNotOrdered() throws Exception {
        TestConsumer consumerAllFail = null;
        this.messageList = new Hashtable<String, AtomicInteger>();
        try {
            TestProducer producerAllFail = new TestProducer(queueFail);
            AMQ1853Test.thread(producerAllFail, false);
            consumerAllFail = new TestConsumer(queueFail, true);
            AMQ1853Test.thread(consumerAllFail, false);
            Thread.sleep(1000L);
            AMQ1853Test.thread(producerAllFail, false);
            Thread.sleep(1000L);
            producerAllFail.getLatch().await();
            LOG.info("producer successful, count = " + producerAllFail.getLatch().getCount());
            LOG.info("final message list size =  " + this.messageList.size());
            Assert.assertTrue((String)("message list size =  " + this.messageList.size() + " exptected:10"), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return 10 == AMQ1853Test.this.messageList.size();
                }
            }));
            consumerAllFail.getLatch().await();
            LOG.info("consumerAllFail successful, count = " + consumerAllFail.getLatch().getCount());
            Iterator<String> keys = this.messageList.keySet().iterator();
            for (AtomicInteger counter : this.messageList.values()) {
                String message = keys.next();
                LOG.info("final count for message " + message + " counter =  " + counter.get());
                Assert.assertTrue((String)("for message " + message + " counter =  " + counter.get()), (counter.get() == 3 ? 1 : 0) != 0);
            }
            Assert.assertFalse((boolean)consumerAllFail.messageReceiptIsOrdered());
        }
        finally {
            if (consumerAllFail != null) {
                consumerAllFail.setStop(true);
            }
        }
    }

    private static Thread thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
        return brokerThread;
    }

    static {
        LOG = LoggerFactory.getLogger(AMQ1853Test.class);
    }

    private class TestConsumer
    implements Runnable,
    ExceptionListener,
    MessageListener {
        private CountDownLatch latch = null;
        private int receivedMessageCounter = 0;
        private boolean bFakeFail = false;
        String destinationName = null;
        boolean bMessageReceiptIsOrdered = true;
        boolean bStop = false;
        String previousMessageId = null;
        private ActiveMQConnectionFactory connectionFactory = null;
        private ActiveMQConnection connection = null;
        private Session session = null;
        private MessageConsumer consumer = null;

        public TestConsumer(String destinationName, boolean bFakeFail) {
            this.bFakeFail = bFakeFail;
            this.latch = new CountDownLatch(10 * (this.bFakeFail ? 3 : 1));
            this.destinationName = destinationName;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public boolean messageReceiptIsOrdered() {
            return this.bMessageReceiptIsOrdered;
        }

        @Override
        public void run() {
            try {
                LOG.info("Started TestConsumer for destination (" + this.destinationName + ")");
                this.connectionFactory = new ActiveMQConnectionFactory(AMQ1853Test.jmsConnectionURI);
                this.connection = (ActiveMQConnection)this.connectionFactory.createConnection();
                this.connection.setNonBlockingRedelivery(true);
                this.session = this.connection.createSession(true, 0);
                RedeliveryPolicy policy = this.connection.getRedeliveryPolicy();
                policy.setInitialRedeliveryDelay(1000L);
                policy.setBackOffMultiplier(-1.0);
                policy.setRedeliveryDelay(1000L);
                policy.setMaximumRedeliveryDelay(-1L);
                policy.setUseExponentialBackOff(false);
                policy.setMaximumRedeliveries(2);
                this.connection.setExceptionListener((ExceptionListener)this);
                Queue destination = this.session.createQueue(this.destinationName);
                this.consumer = this.session.createConsumer((Destination)destination);
                this.consumer.setMessageListener((MessageListener)this);
                this.connection.start();
                while (!this.bStop) {
                    Thread.sleep(100L);
                }
                LOG.info("Finished TestConsumer for destination name (" + this.destinationName + ") remaining " + this.latch.getCount() + " messages " + this.toString());
            }
            catch (Exception e) {
                LOG.error("Consumer (" + this.destinationName + ") Caught: " + e);
            }
            finally {
                try {
                    if (this.consumer != null) {
                        this.consumer.close();
                    }
                    if (this.session != null) {
                        this.session.close();
                    }
                    if (this.connection != null) {
                        this.connection.close();
                    }
                }
                catch (Exception e) {
                    LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e);
                }
            }
        }

        public synchronized void onException(JMSException ex) {
            LOG.error("Consumer for destination, (" + this.destinationName + "), JMS Exception occured.  Shutting down client.");
        }

        public synchronized void setStop(boolean bStop) {
            this.bStop = bStop;
        }

        public synchronized void onMessage(Message message) {
            ++this.receivedMessageCounter;
            this.latch.countDown();
            LOG.info("Consumer for destination (" + this.destinationName + ") latch countdown: " + this.latch.getCount() + " :: Number messages received " + this.receivedMessageCounter);
            try {
                if (this.receivedMessageCounter % 3 == 1) {
                    this.previousMessageId = message.getJMSMessageID();
                }
                if (this.bMessageReceiptIsOrdered) {
                    this.bMessageReceiptIsOrdered = this.previousMessageId.trim().equals(message.getJMSMessageID());
                }
                final String jmsMessageId = message.getJMSMessageID();
                Assert.assertTrue((String)"Did not find expected ", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                    public boolean isSatisified() throws Exception {
                        return AMQ1853Test.this.messageList.containsKey(jmsMessageId);
                    }
                }));
                AtomicInteger counter = AMQ1853Test.this.messageList.get(jmsMessageId);
                counter.incrementAndGet();
                LOG.info("Consumer for destination (" + this.destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n" + this.previousMessageId + " = previousMessageId\n" + this.bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n>>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\nmessage counter = " + counter.get());
                if (!this.bFakeFail) {
                    LOG.debug("Consumer on destination " + this.destinationName + " committing JMS Session for message: " + message.toString());
                    this.session.commit();
                } else {
                    LOG.debug("Consumer on destination " + this.destinationName + " rolling back JMS Session for message: " + message.toString());
                    this.session.rollback();
                }
            }
            catch (Exception ex) {
                ex.printStackTrace();
                LOG.error("Error reading JMS Message from destination " + this.destinationName + ".");
            }
        }
    }

    private class TestProducer
    implements Runnable {
        private CountDownLatch latch = null;
        private String destinationName = null;

        public TestProducer(String destinationName) {
            this.destinationName = destinationName;
            this.latch = new CountDownLatch(10);
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory = null;
            ActiveMQConnection connection = null;
            ActiveMQSession session = null;
            Queue destination = null;
            try {
                LOG.info("Started TestProducer for destination (" + this.destinationName + ")");
                connectionFactory = new ActiveMQConnectionFactory(AMQ1853Test.jmsConnectionURI);
                connection = (ActiveMQConnection)connectionFactory.createConnection();
                connection.setCopyMessageOnSend(false);
                connection.start();
                session = (ActiveMQSession)connection.createSession(false, 1);
                destination = session.createQueue(this.destinationName);
                ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer((Destination)destination);
                producer.setDeliveryMode(1);
                for (int i = 0; i < 5; ++i) {
                    TextMessage message = session.createTextMessage();
                    message.setLongProperty("TestTime", System.currentTimeMillis());
                    try {
                        producer.send((Message)message);
                        LOG.info("Producer (" + this.destinationName + ")\n" + message.getJMSMessageID() + " = sent messageId\n");
                        this.latch.countDown();
                        LOG.info(" Latch count  " + this.latch.getCount());
                        LOG.info("Producer message list size = " + AMQ1853Test.this.messageList.keySet().size());
                        AMQ1853Test.this.messageList.put(message.getJMSMessageID(), new AtomicInteger(0));
                        LOG.info("Producer message list size = " + AMQ1853Test.this.messageList.keySet().size());
                    }
                    catch (Exception deeperException) {
                        LOG.info("Producer for destination (" + this.destinationName + ") Caught: " + deeperException);
                    }
                    Thread.sleep(1000L);
                }
                LOG.info("Finished TestProducer for destination (" + this.destinationName + ")");
            }
            catch (Exception e) {
                LOG.error("Terminating TestProducer(" + this.destinationName + ")Caught: " + e);
            }
            finally {
                try {
                    if (session != null) {
                        session.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                }
                catch (Exception e) {
                    LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e);
                }
            }
        }
    }
}

