/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Date;
import java.util.Set;
import java.util.Vector;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicDurableConnectStatsTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(TopicDurableConnectStatsTest.class);
    private BrokerService broker;
    private ActiveMQTopic topic;
    private final Vector<Throwable> exceptions = new Vector();
    private final int messageSize = 4000;
    protected MBeanServerConnection mbeanServer;
    protected String domain = "org.apache.activemq";
    private ActiveMQConnectionFactory connectionFactory = null;
    final int numMessages = 20;
    private static Session session2 = null;

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        this.connectionFactory = new ActiveMQConnectionFactory("vm://" + this.getName(true));
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setAll(10);
        this.connectionFactory.setPrefetchPolicy(prefetchPolicy);
        this.connectionFactory.setWatchTopicAdvisories(false);
        return this.connectionFactory;
    }

    @Override
    protected Connection createConnection() throws Exception {
        return this.createConnection("cliName");
    }

    protected Connection createConnection(String name) throws Exception {
        Connection con = super.createConnection();
        con.setClientID(name);
        con.start();
        return con;
    }

    public static Test suite() {
        return TopicDurableConnectStatsTest.suite(TopicDurableConnectStatsTest.class);
    }

    protected void setUp() throws Exception {
        this.exceptions.clear();
        this.topic = (ActiveMQTopic)this.createDestination();
        this.createBroker();
        this.mbeanServer = ManagementFactory.getPlatformMBeanServer();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        this.destroyBroker();
    }

    private void createBroker() throws Exception {
        this.createBroker(true);
    }

    private void createBroker(boolean deleteAllMessages) throws Exception {
        this.broker = BrokerFactory.createBroker((String)("broker:(vm://" + this.getName(true) + ")"));
        this.broker.setBrokerName(this.getName(true));
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
    }

    private void destroyBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
        ObjectName objectName = new ObjectName(name);
        LOG.info("** Looking for " + name);
        try {
            if (this.mbeanServer.isRegistered(objectName)) {
                LOG.info("Bean Registered: " + String.valueOf(objectName));
            } else {
                LOG.info("Couldn't find Mbean! " + String.valueOf(objectName));
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return objectName;
    }

    public void testPendingTopicStat() throws Exception {
        int i;
        Connection consumerCon = this.createConnection("cliId1");
        Session consumerSession = consumerCon.createSession(true, 1);
        TopicSubscriber consumer1 = consumerSession.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        TopicDurableConnectStatsTest.assertNotNull((Object)consumer1);
        DurableSubscriptionViewMBean subscriber1 = null;
        ObjectName query = new ObjectName(this.domain + ":type=Broker,brokerName=" + this.getName(true) + ",destinationType=Topic,destinationName=" + this.topic.getTopicName() + ",endpoint=Consumer,clientId=cliId1,consumerId=*");
        Set<ObjectName> set = this.mbeanServer.queryNames(query, null);
        ObjectName subscriberObjName1 = set.iterator().next();
        subscriber1 = MBeanServerInvocationHandler.newProxyInstance(this.mbeanServer, subscriberObjName1, DurableSubscriptionViewMBean.class, true);
        LOG.info("Beginning Pending Queue Size count: " + subscriber1.getPendingQueueSize());
        LOG.info("Prefetch Limit: " + subscriber1.getPrefetchSize());
        TopicDurableConnectStatsTest.assertEquals((String)"no pending", (int)0, (int)subscriber1.getPendingQueueSize());
        TopicDurableConnectStatsTest.assertEquals((String)"Prefetch Limit ", (int)10, (int)subscriber1.getPrefetchSize());
        Connection producerCon = this.createConnection("x");
        Session producerSessions = producerCon.createSession(true, 1);
        MessageProducer producer = producerSessions.createProducer((Destination)this.topic);
        producer.setDeliveryMode(2);
        for (i = 0; i < 20; ++i) {
            if (i == 15) {
                LOG.info("Killing consumer at 15");
                consumerSession.close();
                consumerCon.close();
            }
            TextMessage message = producerSessions.createTextMessage(this.createMessageText(i));
            message.setJMSExpiration(0L);
            message.setStringProperty("filter", "true");
            producer.send((Destination)this.topic, (Message)message);
            producerSessions.commit();
        }
        LOG.info("Sent " + i + " messages in total");
        producerCon.close();
        LOG.info("Pending Queue Size count: " + subscriber1.getPendingQueueSize());
        TopicDurableConnectStatsTest.assertEquals((String)"pending as expected", (int)20, (int)subscriber1.getPendingQueueSize());
        LOG.info("Re-connect client and consume messages");
        Connection con2 = this.createConnection("cliId1");
        session2 = con2.createSession(true, 1);
        TopicSubscriber consumer2 = session2.createDurableSubscriber((Topic)this.topic, "SubsId", "filter = 'true'", true);
        final Listener listener = new Listener();
        consumer2.setMessageListener((MessageListener)listener);
        TopicDurableConnectStatsTest.assertTrue((String)"received all sent", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 20 == listener.count;
            }
        }));
        LOG.info("Received: " + listener.count);
        int pq = subscriber1.getPendingQueueSize();
        LOG.info("Pending Queue Size count: " + pq);
        TopicDurableConnectStatsTest.assertEquals((String)"Pending queue after consumed", (int)0, (int)pq);
        session2.close();
        con2.close();
        LOG.info("FINAL Pending Queue Size count (after consumer close): " + subscriber1.getPendingQueueSize());
    }

    private String createMessageText(int index) {
        StringBuffer buffer = new StringBuffer(4000);
        buffer.append("Message: " + index + " sent at: " + String.valueOf(new Date()));
        if (buffer.length() > 4000) {
            return buffer.substring(0, 4000);
        }
        for (int i = buffer.length(); i < 4000; ++i) {
            buffer.append(' ');
        }
        return buffer.toString();
    }

    public static class Listener
    implements MessageListener {
        int count = 0;
        String id = null;

        Listener() {
        }

        public void onMessage(Message message) {
            ++this.count;
            try {
                session2.commit();
            }
            catch (JMSException e1) {
                e1.printStackTrace();
            }
            if (this.id != null) {
                try {
                    LOG.info(this.id + ", " + message.getJMSMessageID());
                }
                catch (Exception e1) {
                    // empty catch block
                }
            }
            try {
                Thread.sleep(2L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

