/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4212Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class);
    private BrokerService service;
    private String connectionUri;
    private ActiveMQConnectionFactory cf;
    private final int MSG_COUNT = 256;

    @Before
    public void setUp() throws IOException, Exception {
        this.createBroker(true, false);
    }

    public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception {
        this.service = new BrokerService();
        this.service.setBrokerName("InactiveSubTest");
        this.service.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.service.setAdvisorySupport(false);
        this.service.setPersistent(true);
        this.service.setUseJmx(true);
        this.service.setKeepDurableSubsActive(false);
        KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter();
        File dataFile = new File("KahaDB");
        pa.setDirectory(dataFile);
        pa.setJournalMaxFileLength(10240);
        pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5L));
        pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5L));
        pa.setForceRecoverIndex(recover);
        pa.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL.name());
        this.service.setPersistenceAdapter((PersistenceAdapter)pa);
        this.service.start();
        this.service.waitUntilStarted();
        this.connectionUri = "vm://InactiveSubTest?create=false";
        this.cf = new ActiveMQConnectionFactory(this.connectionUri);
    }

    private void restartBroker() throws Exception {
        this.stopBroker();
        this.createBroker(false, false);
    }

    private void recoverBroker() throws Exception {
        this.stopBroker();
        this.createBroker(false, true);
    }

    @After
    public void stopBroker() throws Exception {
        if (this.service != null) {
            this.service.stop();
            this.service.waitUntilStopped();
            this.service = null;
        }
    }

    @Test
    public void testDurableSubPrefetchRecovered() throws Exception {
        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
        this.sendMessages((Destination)queue);
        LOG.info("There are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        this.createInactiveDurableSub((Topic)topic);
        Assert.assertTrue((String)"Should have an inactive durable sub", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ObjectName[] subs = AMQ4212Test.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return subs != null && subs.length == 1;
            }
        }));
        this.sendMessages((Destination)queue);
        LOG.info("There are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        Assert.assertTrue((this.getNumberOfJournalFiles() > 1 ? 1 : 0) != 0);
        LOG.info("Restarting the broker.");
        this.restartBroker();
        LOG.info("Restarted the broker.");
        LOG.info("There are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        Assert.assertTrue((this.getNumberOfJournalFiles() > 1 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Should have an inactive durable sub", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ObjectName[] subs = AMQ4212Test.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return subs != null && subs.length == 1;
            }
        }));
        this.service.getAdminView().removeQueue(queue.getQueueName());
        Assert.assertTrue((String)("Less than two journal files expected, was " + this.getNumberOfJournalFiles()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ4212Test.this.getNumberOfJournalFiles() <= 2;
            }
        }, (long)TimeUnit.MINUTES.toMillis(2L)));
        LOG.info("Sending {} Messages to the Topic.", (Object)256);
        this.sendMessages((Destination)topic);
        LOG.info("Attempt to consume {} messages from the Topic.", (Object)256);
        Assert.assertEquals((long)256L, (long)this.consumeFromInactiveDurableSub((Topic)topic));
        LOG.info("Recovering the broker.");
        this.recoverBroker();
        LOG.info("Recovering the broker.");
        Assert.assertTrue((String)"Should have an inactive durable sub", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ObjectName[] subs = AMQ4212Test.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return subs != null && subs.length == 1;
            }
        }));
    }

    @Test
    public void testDurableAcksNotDropped() throws Exception {
        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
        this.createInactiveDurableSub((Topic)topic);
        Assert.assertTrue((String)"Should have an inactive durable sub", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ObjectName[] subs = AMQ4212Test.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return subs != null && subs.length == 1;
            }
        }));
        this.sendMessages((Destination)topic, 1);
        this.sendMessages((Destination)queue);
        LOG.info("Before consume there are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        this.consumeDurableMessages((Topic)topic, 1);
        LOG.info("After consume there are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        this.sendMessages((Destination)queue);
        LOG.info("More Queued. There are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        Assert.assertTrue((this.getNumberOfJournalFiles() > 1 ? 1 : 0) != 0);
        LOG.info("Restarting the broker.");
        this.restartBroker();
        LOG.info("Restarted the broker.");
        LOG.info("There are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        Assert.assertTrue((this.getNumberOfJournalFiles() > 1 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Should have an inactive durable sub", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ObjectName[] subs = AMQ4212Test.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return subs != null && subs.length == 1;
            }
        }));
        this.service.getAdminView().removeQueue(queue.getQueueName());
        Assert.assertTrue((String)("Less than three journal file expected, was " + this.getNumberOfJournalFiles()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ4212Test.this.getNumberOfJournalFiles() <= 3;
            }
        }, (long)TimeUnit.MINUTES.toMillis(3L)));
        this.tryConsumeExpectNone((Topic)topic);
        LOG.info("There are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        LOG.info("Recovering the broker.");
        this.recoverBroker();
        LOG.info("Recovering the broker.");
        LOG.info("There are currently [{}] journal log files.", (Object)this.getNumberOfJournalFiles());
        Assert.assertTrue((String)"Should have an inactive durable sub", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                ObjectName[] subs = AMQ4212Test.this.service.getAdminView().getInactiveDurableTopicSubscribers();
                return subs != null && subs.length == 1;
            }
        }));
        this.tryConsumeExpectNone((Topic)topic);
        Assert.assertTrue((String)("Less than three journal file expected, was " + this.getNumberOfJournalFiles()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return AMQ4212Test.this.getNumberOfJournalFiles() == 1;
            }
        }, (long)TimeUnit.MINUTES.toMillis(1L)));
    }

    private int getNumberOfJournalFiles() throws IOException {
        Collection files = ((KahaDBPersistenceAdapter)this.service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
        int reality = 0;
        for (DataFile file : files) {
            if (file == null) continue;
            ++reality;
        }
        return reality;
    }

    private void createInactiveDurableSub(Topic topic) throws Exception {
        Connection connection = this.cf.createConnection();
        connection.setClientID("Inactive");
        Session session = connection.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber(topic, "Inactive");
        consumer.close();
        connection.close();
    }

    private void consumeDurableMessages(Topic topic, int count) throws Exception {
        Connection connection = this.cf.createConnection();
        connection.setClientID("Inactive");
        Session session = connection.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber(topic, "Inactive");
        connection.start();
        for (int i = 0; i < count; ++i) {
            if (consumer.receive(TimeUnit.SECONDS.toMillis(10L)) != null) continue;
            Assert.fail((String)"should have received a message");
        }
        consumer.close();
        connection.close();
    }

    private void tryConsumeExpectNone(Topic topic) throws Exception {
        Connection connection = this.cf.createConnection();
        connection.setClientID("Inactive");
        Session session = connection.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber(topic, "Inactive");
        connection.start();
        if (consumer.receive(TimeUnit.SECONDS.toMillis(10L)) != null) {
            Assert.fail((String)"Should be no messages for this durable.");
        }
        consumer.close();
        connection.close();
    }

    private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
        Connection connection = this.cf.createConnection();
        connection.setClientID("Inactive");
        connection.start();
        Session session = connection.createSession(false, 1);
        TopicSubscriber consumer = session.createDurableSubscriber(topic, "Inactive");
        int count = 0;
        while (consumer.receive(10000L) != null) {
            ++count;
        }
        consumer.close();
        connection.close();
        return count;
    }

    private void sendMessages(Destination destination) throws Exception {
        this.sendMessages(destination, 256);
    }

    private void sendMessages(Destination destination, int count) throws Exception {
        Connection connection = this.cf.createConnection();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(2);
        for (int i = 0; i < count; ++i) {
            TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + String.valueOf(destination));
            producer.send((Message)message);
        }
        connection.close();
    }
}

