/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueZeroPrefetchLazyDispatchPriorityTest {
    private static final Logger LOG = LoggerFactory.getLogger(QueueZeroPrefetchLazyDispatchPriorityTest.class);
    private final byte[] PAYLOAD = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
    private final int ITERATIONS = 2;
    private BrokerService broker;

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test(timeout=120000L)
    public void testPriorityMessages() throws Exception {
        for (int i = 0; i < 2; ++i) {
            this.produceMessages(4, 4, "TestQ");
            this.produceMessages(1, 5, "TestQ");
            LOG.info("On iteration {}", (Object)i);
            Thread.sleep(1000L);
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list " + consumeList.size());
            Assert.assertEquals((String)"message 1 should be priority high", (long)5L, (long)consumeList.get(0).getJMSPriority());
            Assert.assertEquals((String)"message 2 should be priority medium", (long)4L, (long)consumeList.get(1).getJMSPriority());
            Assert.assertEquals((String)"message 3 should be priority medium", (long)4L, (long)consumeList.get(2).getJMSPriority());
            Assert.assertEquals((String)"message 4 should be priority medium", (long)4L, (long)consumeList.get(3).getJMSPriority());
            Assert.assertEquals((String)"message 5 should be priority medium", (long)4L, (long)consumeList.get(4).getJMSPriority());
        }
    }

    @Test(timeout=120000L)
    @Ignore(value="Flaky test on Jenkins, should be refactored")
    public void testPriorityMessagesMoreThanPageSize() throws Exception {
        int numToSend = 5;
        for (int i = 0; i < 2; ++i) {
            this.produceMessages(4, 4, "TestQ");
            Thread.sleep(1000L);
            this.produceMessages(1, 5, "TestQ");
            Thread.sleep(2000L);
            LOG.info("On iteration {}", (Object)i);
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list {}", (Object)consumeList.size());
            Assert.assertFalse((String)"Consumed list should not be empty", (boolean)consumeList.isEmpty());
            Assert.assertEquals((String)"message 1 should be priority high", (long)5L, (long)consumeList.get(0).getJMSPriority());
            for (int j = 1; j < 4; ++j) {
                Assert.assertEquals((String)("message " + j + " should be priority medium"), (long)4L, (long)consumeList.get(j).getJMSPriority());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testLongLivedPriorityConsumer() throws Exception {
        int numToSend = 5;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue("TestQ"));
            connection.start();
            for (int i = 0; i < 2; ++i) {
                this.produceMessages(4, 4, "TestQ");
                this.produceMessages(1, 5, "TestQ");
                Message message = consumer.receive(4000L);
                Assert.assertEquals((String)"message should be priority high", (long)5L, (long)message.getJMSPriority());
            }
        }
        ArrayList<Message> consumeList = this.consumeMessages("TestQ");
        LOG.info("Consumed list {}", (Object)consumeList.size());
        for (Message message : consumeList) {
            Assert.assertEquals((String)"should be priority medium", (long)4L, (long)message.getJMSPriority());
        }
    }

    @Test(timeout=120000L)
    public void testPriorityMessagesWithJmsBrowser() throws Exception {
        int numToSend = 5;
        for (int i = 0; i < 2; ++i) {
            this.produceMessages(4, 4, "TestQ");
            ArrayList<Message> browsed = this.browseMessages("TestQ");
            LOG.info("Browsed: {}", (Object)browsed.size());
            this.produceMessages(1, 5, "TestQ");
            Thread.sleep(1000L);
            LOG.info("On iteration {}", (Object)i);
            Message message = this.consumeOneMessage("TestQ");
            Assert.assertNotNull((Object)message);
            Assert.assertEquals((long)5L, (long)message.getJMSPriority());
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list {}", (Object)consumeList.size());
            for (int j = 1; j < 4; ++j) {
                Assert.assertEquals((String)("Iteration: " + i + ", message " + j + " should be priority medium"), (long)4L, (long)consumeList.get(j).getJMSPriority());
            }
        }
    }

    @Test(timeout=120000L)
    public void testJmsBrowserGetsPagedIn() throws Exception {
        int numToSend = 5;
        for (int i = 0; i < 2; ++i) {
            this.produceMessages(5, 4, "TestQ");
            ArrayList<Message> browsed = this.browseMessages("TestQ");
            LOG.info("Browsed: {}", (Object)browsed.size());
            Assert.assertEquals((long)0L, (long)browsed.size());
            Message message = this.consumeOneMessage("TestQ", 2);
            Assert.assertNotNull((Object)message);
            browsed = this.browseMessages("TestQ");
            LOG.info("Browsed: {}", (Object)browsed.size());
            Assert.assertEquals((String)"see only the paged in for pull", (long)1L, (long)browsed.size());
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list " + consumeList.size());
            Assert.assertEquals((long)5L, (long)consumeList.size());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void produceMessages(int numberOfMessages, int priority, String queueName) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        connectionFactory.setConnectionIDPrefix("pri-" + priority);
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer((Destination)new ActiveMQQueue(queueName));
            connection.start();
            for (int i = 0; i < numberOfMessages; ++i) {
                BytesMessage m = session.createBytesMessage();
                m.writeBytes(this.PAYLOAD);
                m.setJMSPriority(priority);
                producer.send((Message)m, 2, m.getJMSPriority(), 0L);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ArrayList<Message> consumeMessages(String queueName) throws Exception {
        ArrayList<Message> returnedMessages = new ArrayList<Message>();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue(queueName));
            connection.start();
            boolean finished = false;
            while (!finished) {
                Message message = consumer.receive(returnedMessages.isEmpty() ? 5000L : 1000L);
                if (message == null) {
                    finished = true;
                }
                if (message == null) continue;
                returnedMessages.add(message);
            }
            consumer.close();
            ArrayList<Message> arrayList = returnedMessages;
            return arrayList;
        }
    }

    private Message consumeOneMessage(String queueName) throws Exception {
        return this.consumeOneMessage(queueName, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message consumeOneMessage(String queueName, int ackMode) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, ackMode);
            MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue(queueName));
            connection.start();
            Message message = consumer.receive(4000L);
            if (message == null) {
                message = consumer.receive(2000L);
            }
            Message message2 = message;
            return message2;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ArrayList<Message> browseMessages(String queueName) throws Exception {
        ArrayList<Message> returnedMessages = new ArrayList<Message>();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            QueueBrowser consumer = session.createBrowser((Queue)new ActiveMQQueue(queueName));
            connection.start();
            Enumeration enumeration = consumer.getEnumeration();
            while (enumeration.hasMoreElements()) {
                Message message = (Message)enumeration.nextElement();
                returnedMessages.add(message);
            }
            ArrayList<Message> arrayList = returnedMessages;
            return arrayList;
        }
    }

    private BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        ArrayList<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry pe = new PolicyEntry();
        pe.setPrioritizedMessages(true);
        pe.setExpireMessagesPeriod(500L);
        pe.setMaxPageSize(100);
        pe.setMaxExpirePageSize(0);
        pe.setMaxBrowsePageSize(0);
        pe.setQueuePrefetch(0);
        pe.setLazyDispatch(true);
        pe.setOptimizedDispatch(true);
        pe.setUseCache(false);
        pe.setQueue(">");
        entries.add(pe);
        policyMap.setPolicyEntries(entries);
        broker.setDestinationPolicy(policyMap);
        broker.addConnector("tcp://0.0.0.0:0");
        return broker;
    }
}

