/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.File;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableConsumerTest
extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class);
    private static int COUNT = 1024;
    private static String CONSUMER_NAME = "DURABLE_TEST";
    protected BrokerService broker;
    protected String bindAddress = "tcp://localhost:61616";
    protected byte[] payload = new byte[32768];
    protected ConnectionFactory factory;
    protected Vector<Exception> exceptions = new Vector();
    private static final String TOPIC_NAME = "failoverTopic";
    private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
    public boolean useDedicatedTaskRunner = false;

    private void configurePersistence(BrokerService broker) throws Exception {
        File dataDirFile = new File("target/" + this.getName());
        KahaDBPersistenceAdapter kahaDBAdapter = new KahaDBPersistenceAdapter();
        kahaDBAdapter.setDirectory(dataDirFile);
        broker.setPersistenceAdapter((PersistenceAdapter)kahaDBAdapter);
    }

    public void testFailover() throws Exception {
        this.configurePersistence(this.broker);
        this.broker.start();
        Thread publisherThread = new Thread(new MessagePublisher());
        publisherThread.start();
        int numSubs = 100;
        final ArrayList list = new ArrayList(100);
        int i = 0;
        while (i < 100) {
            final int id = i++;
            Thread thread = new Thread(new Runnable(){

                @Override
                public void run() {
                    SimpleTopicSubscriber s = new SimpleTopicSubscriber(DurableConsumerTest.CONNECTION_URL, System.currentTimeMillis() + "-" + id, DurableConsumerTest.TOPIC_NAME);
                    list.add(s);
                }
            });
            thread.start();
        }
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 100 == list.size();
            }
        });
        this.broker.stop();
        this.broker = this.createBroker(false);
        this.configurePersistence(this.broker);
        this.broker.start();
        Thread.sleep(10000L);
        for (SimpleTopicSubscriber s : list) {
            s.closeConnection();
        }
        DurableConsumerTest.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    public void initCombosForTestConcurrentDurableConsumer() {
        this.addCombinationValues("useDedicatedTaskRunner", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testConcurrentDurableConsumer() throws Exception {
        this.broker.start();
        this.broker.waitUntilStarted();
        this.factory = this.createConnectionFactory();
        final String topicName = this.getName();
        int numMessages = 500;
        int numConsumers = 1;
        final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers);
        final AtomicInteger receivedCount = new AtomicInteger();
        Runnable consumer = new Runnable(){

            @Override
            public void run() {
                String consumerName = Thread.currentThread().getName();
                int acked = 0;
                int received = 0;
                try {
                    while (acked < 250) {
                        Connection consumerConnection = DurableConsumerTest.this.factory.createConnection();
                        ((ActiveMQConnection)consumerConnection).setWatchTopicAdvisories(false);
                        consumerConnection.setClientID(consumerName);
                        Session consumerSession = consumerConnection.createSession(false, 2);
                        Topic topic = consumerSession.createTopic(topicName);
                        consumerConnection.start();
                        TopicSubscriber consumer = consumerSession.createDurableSubscriber(topic, consumerName);
                        counsumerStarted.countDown();
                        Message msg = null;
                        do {
                            if ((msg = consumer.receive(5000L)) == null) continue;
                            receivedCount.incrementAndGet();
                            if (received != 0 && received % 100 == 0) {
                                LOG.info("Received msg: " + msg.getJMSMessageID());
                            }
                            if (++received % 2 != 0) continue;
                            msg.acknowledge();
                            ++acked;
                        } while (msg == null);
                        consumerConnection.close();
                    }
                    TestCase.assertTrue((received >= acked ? 1 : 0) != 0);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    DurableConsumerTest.this.exceptions.add(e);
                }
            }
        };
        ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
        for (int i = 0; i < numConsumers; ++i) {
            executor.execute(consumer);
        }
        DurableConsumerTest.assertTrue((boolean)counsumerStarted.await(30L, TimeUnit.SECONDS));
        Connection producerConnection = this.factory.createConnection();
        ((ActiveMQConnection)producerConnection).setWatchTopicAdvisories(false);
        Session producerSession = producerConnection.createSession(false, 1);
        Topic topic = producerSession.createTopic(topicName);
        MessageProducer producer = producerSession.createProducer((Destination)topic);
        producerConnection.start();
        for (int i = 0; i < 500; ++i) {
            BytesMessage msg = producerSession.createBytesMessage();
            msg.writeBytes(this.payload);
            producer.send((Message)msg);
            if (i == 0 || i % 100 != 0) continue;
            LOG.info("Sent msg " + i);
        }
        executor.shutdown();
        executor.awaitTermination(30L, TimeUnit.SECONDS);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("receivedCount: " + receivedCount.get());
                return receivedCount.get() == 500;
            }
        }, (long)360000L);
        DurableConsumerTest.assertEquals((String)"got required some messages", (int)500, (int)receivedCount.get());
        DurableConsumerTest.assertTrue((String)("no exceptions, but: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    public void testConsumerRecover() throws Exception {
        this.doTestConsumer(true);
    }

    public void testConsumer() throws Exception {
        this.doTestConsumer(false);
    }

    public void testPrefetchViaBrokerConfig() throws Exception {
        Integer prefetchVal = 150;
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setDurableTopicPrefetch(prefetchVal.intValue());
        policyEntry.setPrioritizedMessages(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.factory = this.createConnectionFactory();
        Connection consumerConnection = this.factory.createConnection();
        consumerConnection.setClientID(CONSUMER_NAME);
        Session consumerSession = consumerConnection.createSession(false, 1);
        Topic topic = consumerSession.createTopic(((Object)((Object)this)).getClass().getName());
        TopicSubscriber consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
        consumerConnection.start();
        ObjectName activeSubscriptionObjectName = this.broker.getAdminView().getDurableTopicSubscribers()[0];
        Object prefetchFromSubView = this.broker.getManagementContext().getAttribute(activeSubscriptionObjectName, "PrefetchSize");
        DurableConsumerTest.assertEquals((Object)prefetchVal, (Object)prefetchFromSubView);
    }

    public void doTestConsumer(boolean forceRecover) throws Exception {
        BytesMessage msg;
        int i;
        if (forceRecover) {
            this.configurePersistence(this.broker);
        }
        this.broker.start();
        this.factory = this.createConnectionFactory();
        Connection consumerConnection = this.factory.createConnection();
        consumerConnection.setClientID(CONSUMER_NAME);
        Session consumerSession = consumerConnection.createSession(false, 1);
        Topic topic = consumerSession.createTopic(((Object)((Object)this)).getClass().getName());
        TopicSubscriber consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
        consumerConnection.start();
        consumerConnection.close();
        this.broker.stop();
        this.broker = this.createBroker(false);
        if (forceRecover) {
            this.configurePersistence(this.broker);
        }
        this.broker.start();
        Connection producerConnection = this.factory.createConnection();
        Session producerSession = producerConnection.createSession(false, 1);
        MessageProducer producer = producerSession.createProducer((Destination)topic);
        producerConnection.start();
        for (i = 0; i < COUNT; ++i) {
            msg = producerSession.createBytesMessage();
            msg.writeBytes(this.payload);
            producer.send((Message)msg);
            if (i == 0 || i % 1000 != 0) continue;
            LOG.info("Sent msg " + i);
        }
        producerConnection.close();
        this.broker.stop();
        this.broker = this.createBroker(false);
        if (forceRecover) {
            this.configurePersistence(this.broker);
        }
        this.broker.start();
        consumerConnection = this.factory.createConnection();
        consumerConnection.setClientID(CONSUMER_NAME);
        consumerConnection.start();
        consumerSession = consumerConnection.createSession(false, 1);
        consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
        for (i = 0; i < COUNT; ++i) {
            msg = consumer.receive(10000L);
            DurableConsumerTest.assertNotNull((String)("Missing message: " + i), (Object)msg);
            if (i == 0 || i % 1000 != 0) continue;
            LOG.info("Received msg " + i);
        }
        consumerConnection.close();
    }

    protected void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = this.createBroker(true);
        }
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
    }

    protected Topic creatTopic(Session s, String destinationName) throws JMSException {
        return s.createTopic(destinationName);
    }

    protected BrokerService createBroker(boolean deleteStore) throws Exception {
        BrokerService answer = new BrokerService();
        this.configureBroker(answer, deleteStore);
        return answer;
    }

    protected void configureBroker(BrokerService answer, boolean deleteStore) throws Exception {
        answer.setDeleteAllMessagesOnStartup(deleteStore);
        KahaDBStore kaha = new KahaDBStore();
        File directory = new File("target/activemq-data/kahadb");
        if (deleteStore) {
            IOHelper.deleteChildren((File)directory);
        }
        kaha.setDirectory(directory);
        answer.setPersistenceAdapter((PersistenceAdapter)kaha);
        answer.addConnector(this.bindAddress);
        answer.setUseShutdownHook(false);
        answer.setAdvisorySupport(false);
        answer.setDedicatedTaskRunner(this.useDedicatedTaskRunner);
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.bindAddress);
        factory.setUseDedicatedTaskRunner(this.useDedicatedTaskRunner);
        return factory;
    }

    public static Test suite() {
        return DurableConsumerTest.suite(DurableConsumerTest.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)DurableConsumerTest.suite());
    }

    private class MessagePublisher
    implements Runnable {
        private final boolean shouldPublish = true;

        private MessagePublisher() {
        }

        @Override
        public void run() {
            ActiveMQConnectionFactory topicConnectionFactory = null;
            TopicConnection topicConnection = null;
            TopicSession topicSession = null;
            ActiveMQTopic topic = null;
            TopicPublisher topicPublisher = null;
            Message message = null;
            topicConnectionFactory = new ActiveMQConnectionFactory(DurableConsumerTest.CONNECTION_URL);
            try {
                topic = new ActiveMQTopic(DurableConsumerTest.TOPIC_NAME);
                topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false, 1);
                topicPublisher = topicSession.createPublisher((Topic)topic);
                message = topicSession.createMessage();
            }
            catch (Exception ex) {
                DurableConsumerTest.this.exceptions.add(ex);
            }
            while (true) {
                try {
                    topicPublisher.publish(message, 2, 1, 0x6DDD00L);
                }
                catch (JMSException ex) {
                    DurableConsumerTest.this.exceptions.add((Exception)((Object)ex));
                }
                try {
                    Thread.sleep(1L);
                }
                catch (Exception exception) {
                }
            }
        }
    }

    private class SimpleTopicSubscriber
    implements MessageListener,
    ExceptionListener {
        private TopicConnection topicConnection = null;

        public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) {
            ActiveMQConnectionFactory topicConnectionFactory = null;
            TopicSession topicSession = null;
            ActiveMQTopic topic = null;
            TopicSubscriber topicSubscriber = null;
            topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
            try {
                topic = new ActiveMQTopic(topicName);
                this.topicConnection = topicConnectionFactory.createTopicConnection();
                this.topicConnection.setClientID(clientId);
                this.topicConnection.start();
                topicSession = this.topicConnection.createTopicSession(false, 1);
                topicSubscriber = topicSession.createDurableSubscriber((Topic)topic, clientId);
                topicSubscriber.setMessageListener((MessageListener)this);
            }
            catch (JMSException e) {
                e.printStackTrace();
            }
        }

        public void onMessage(Message arg0) {
        }

        public void closeConnection() {
            if (this.topicConnection != null) {
                try {
                    this.topicConnection.close();
                }
                catch (JMSException jMSException) {
                    // empty catch block
                }
            }
        }

        public void onException(JMSException exception) {
            DurableConsumerTest.this.exceptions.add((Exception)((Object)exception));
        }
    }
}

