/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.virtual;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VirtualTopicConcurrentSendDeleteTest {
    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicConcurrentSendDeleteTest.class);
    BrokerService brokerService;
    ConnectionFactory connectionFactory;

    @Before
    public void createBroker() throws Exception {
        this.createBroker(true);
    }

    public void createBroker(boolean delete) throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(delete);
        this.brokerService.setAdvisorySupport(false);
        ((KahaDBPersistenceAdapter)this.brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
        this.brokerService.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        activeMQConnectionFactory.setWatchTopicAdvisories(false);
        activeMQConnectionFactory.setAlwaysSyncSend(false);
        ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
        zeroPrefetch.setAll(0);
        activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
        this.connectionFactory = activeMQConnectionFactory;
    }

    @After
    public void stopBroker() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void testConsumerQueueDeleteOk() throws Exception {
        boolean numConnections = true;
        int numDestinations = 10;
        int numMessages = 4000;
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        this.brokerService.getRegionBroker().addDestination(this.brokerService.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQTopic("VirtualTopic.TEST"), false);
        for (int i = 0; i < 10; ++i) {
            this.brokerService.getRegionBroker().addDestination(this.brokerService.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"), false);
        }
        final CountDownLatch doneOne = new CountDownLatch(1);
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    int messagestoSend = 0;
                    Connection connection1 = VirtualTopicConcurrentSendDeleteTest.this.connectionFactory.createConnection();
                    connection1.start();
                    Session session = connection1.createSession(false, 1);
                    MessageProducer producer = session.createProducer(null);
                    do {
                        producer.send((Destination)new ActiveMQTopic("VirtualTopic.TEST"), (Message)new ActiveMQMessage());
                        if (++messagestoSend != 1000) continue;
                        doneOne.countDown();
                    } while (messagestoSend < 4000);
                    connection1.close();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 1; ++i) {
            executorService.execute(runnable);
        }
        String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=";
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    doneOne.await(30L, TimeUnit.SECONDS);
                    for (int i = 9; i >= 0; --i) {
                        ActiveMQQueue toDelete = new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST");
                        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + toDelete.getQueueName());
                        QueueViewMBean proxy = (QueueViewMBean)VirtualTopicConcurrentSendDeleteTest.this.brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
                        LOG.info("Q len: " + toDelete.getQueueName() + ", " + proxy.getQueueSize());
                        VirtualTopicConcurrentSendDeleteTest.this.brokerService.getAdminView().removeQueue(toDelete.getPhysicalName());
                        TimeUnit.MILLISECONDS.sleep(100L);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        LOG.info("Enqueues: " + ((RegionBroker)this.brokerService.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
        int numQueues = ((RegionBroker)this.brokerService.getRegionBroker()).getQueueRegion().getDestinationMap().size();
        LOG.info("Destinations: " + numQueues);
        Assert.assertEquals((String)"no queues left", (long)0L, (long)numQueues);
        Assert.assertEquals((String)"no queues, just one topic, in kahadb", (long)1L, (long)this.brokerService.getPersistenceAdapter().getDestinations().size());
    }
}

