/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region.cursors;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class KahaDBPendingMessageCursorTest
extends AbstractPendingMessageCursorTest {
    protected static final Logger LOG = LoggerFactory.getLogger(KahaDBPendingMessageCursorTest.class);
    @Rule
    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));

    @Parameterized.Parameters(name="prioritizedMessages={0},enableSubscriptionStatistics={1}")
    public static Collection<Object[]> data() {
        return Arrays.asList({true, true}, {true, false}, {false, true}, {false, false});
    }

    public KahaDBPendingMessageCursorTest(boolean prioritizedMessages, boolean enableSubscriptionStatistics) {
        super(prioritizedMessages);
        this.enableSubscriptionStatistics = enableSubscriptionStatistics;
    }

    @Override
    protected void setUpBroker(boolean clearDataDir) throws Exception {
        if (clearDataDir && this.dataFileDir.getRoot().exists()) {
            FileUtils.cleanDirectory((File)this.dataFileDir.getRoot());
        }
        super.setUpBroker(clearDataDir);
    }

    @Override
    protected void initPersistence(BrokerService brokerService) throws IOException {
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
        persistenceAdapter.setDirectory(this.dataFileDir.getRoot());
        persistenceAdapter.setEnableSubscriptionStatistics(this.enableSubscriptionStatistics);
        this.broker.setPersistenceAdapter((PersistenceAdapter)persistenceAdapter);
    }

    @Test
    public void testDurableMessageSizeAfterRestartAndPublish() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Topic topic = this.publishTestMessagesDurable(connection, new String[]{"sub1"}, 200, publishedMessageSize, 2);
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        this.verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)topic, 200, publishedMessageSize.get());
        long beforeRestartSize = ((DurableTopicSubscription)topic.getDurableTopicSubs().get(subKey)).getPendingMessageSize();
        Assert.assertEquals((long)beforeRestartSize, (long)topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
        this.stopBroker();
        this.setUpBroker(false);
        topic = (Topic)this.getBroker().getDestination((ActiveMQDestination)new ActiveMQTopic(this.defaultTopicName));
        Assert.assertEquals((long)beforeRestartSize, (long)((DurableTopicSubscription)topic.getDurableTopicSubs().get(subKey)).getPendingMessageSize());
        connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        topic = this.publishTestMessagesDurable(connection, new String[]{"sub1"}, 200, publishedMessageSize, 2);
        this.verifyPendingStats(topic, subKey, 400, publishedMessageSize.get());
        this.verifyStoreStats((Destination)topic, 400, publishedMessageSize.get());
    }

    @Test
    public void testMessageSizeTwoDurablesPartialConsumption() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
        Topic dest = this.publishTestMessagesDurable(connection, new String[]{"sub1", "sub2"}, 200, publishedMessageSize, 2);
        this.verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 200, publishedMessageSize.get());
        this.consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
        this.verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
        this.verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)dest, 200, publishedMessageSize.get());
        connection.close();
    }

    @Test
    public void testNonPersistentDurableMessageSize() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Topic topic = this.publishTestMessagesDurable(connection, new String[]{"sub1"}, 200, publishedMessageSize, 1);
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        this.verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
        this.verifyStoreStats((Destination)topic, 0, 0L);
    }

    @Test
    public void testEnabledSubscriptionStatistics() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
        Topic dest = this.publishTestMessagesDurable(connection, new String[]{"sub1", "sub2"}, 200, publishedMessageSize, 2);
        TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
        MessageStoreSubscriptionStatistics stats = store.getMessageStoreSubStatistics();
        if (this.enableSubscriptionStatistics) {
            Assert.assertTrue((stats.getMessageCount(subKey.toString()).getCount() == 200L ? 1 : 0) != 0);
            Assert.assertTrue((stats.getMessageSize(subKey.toString()).getTotalSize() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((stats.getMessageCount(subKey2.toString()).getCount() == 200L ? 1 : 0) != 0);
            Assert.assertTrue((stats.getMessageSize(subKey2.toString()).getTotalSize() > 0L ? 1 : 0) != 0);
            Assert.assertEquals((long)stats.getMessageCount().getCount(), (long)(stats.getMessageCount(subKey.toString()).getCount() + stats.getMessageSize(subKey.toString()).getCount()));
            Assert.assertEquals((long)stats.getMessageSize().getTotalSize(), (long)(stats.getMessageSize(subKey.toString()).getTotalSize() + stats.getMessageSize(subKey2.toString()).getTotalSize()));
            store.deleteSubscription(subKey2.getClientId(), subKey2.getSubscriptionName());
            Assert.assertEquals((long)stats.getMessageCount().getCount(), (long)stats.getMessageCount(subKey.toString()).getCount());
            Assert.assertEquals((long)stats.getMessageSize().getTotalSize(), (long)stats.getMessageSize(subKey.toString()).getTotalSize());
            Assert.assertTrue((stats.getMessageCount(subKey2.toString()).getCount() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((stats.getMessageSize(subKey2.toString()).getTotalSize() == 0L ? 1 : 0) != 0);
        } else {
            Assert.assertTrue((stats.getMessageCount(subKey.toString()).getCount() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((stats.getMessageSize(subKey.toString()).getTotalSize() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((stats.getMessageCount(subKey2.toString()).getCount() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((stats.getMessageSize(subKey2.toString()).getTotalSize() == 0L ? 1 : 0) != 0);
            Assert.assertEquals((long)0L, (long)stats.getMessageCount().getCount());
            Assert.assertEquals((long)0L, (long)stats.getMessageSize().getTotalSize());
        }
    }

    @Test
    public void testUpdateMessageSubSize() throws Exception {
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Session session = connection.createSession(false, 1);
        javax.jms.Topic dest = session.createTopic(this.defaultTopicName);
        session.createDurableSubscriber(dest, "sub1");
        session.createDurableSubscriber(dest, "sub2");
        MessageProducer prod = session.createProducer((javax.jms.Destination)dest);
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("SmallMessage");
        prod.send((javax.jms.Message)message);
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub1");
        Topic topic = (Topic)this.getBroker().getDestination((ActiveMQDestination)new ActiveMQTopic(this.defaultTopicName));
        DurableTopicSubscription sub = (DurableTopicSubscription)topic.getDurableTopicSubs().get(subKey);
        DurableTopicSubscription sub2 = (DurableTopicSubscription)topic.getDurableTopicSubs().get(subKey2);
        long sizeBeforeUpdate = sub.getPendingMessageSize();
        message = (ActiveMQTextMessage)topic.getMessageStore().getMessage(message.getMessageId());
        message.setText("LargerMessageLargerMessage");
        topic.getMessageStore().updateMessage((Message)message);
        Assert.assertTrue((sub.getPendingMessageSize() > sizeBeforeUpdate + 10L ? 1 : 0) != 0);
        Assert.assertEquals((long)sub.getPendingMessageSize(), (long)topic.getMessageStore().getMessageSize());
        Assert.assertEquals((long)sub.getPendingMessageSize(), (long)sub2.getPendingMessageSize());
    }

    @Test
    public void testUpdateMessageSubSizeAfterConsume() throws Exception {
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Session session = connection.createSession(false, 1);
        javax.jms.Topic dest = session.createTopic(this.defaultTopicName);
        session.createDurableSubscriber(dest, "sub1");
        TopicSubscriber subscriber2 = session.createDurableSubscriber(dest, "sub2");
        MessageProducer prod = session.createProducer((javax.jms.Destination)dest);
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("SmallMessage");
        ActiveMQTextMessage message2 = new ActiveMQTextMessage();
        message2.setText("SmallMessage2");
        prod.send((javax.jms.Message)message);
        prod.send((javax.jms.Message)message2);
        subscriber2.receive();
        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
        Topic topic = (Topic)this.getBroker().getDestination((ActiveMQDestination)new ActiveMQTopic(this.defaultTopicName));
        final DurableTopicSubscription sub = (DurableTopicSubscription)topic.getDurableTopicSubs().get(subKey);
        final DurableTopicSubscription sub2 = (DurableTopicSubscription)topic.getDurableTopicSubs().get(subKey2);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return sub.getPendingMessageSize() > sub2.getPendingMessageSize();
            }
        });
        long sizeBeforeUpdate = sub.getPendingMessageSize();
        long sizeBeforeUpdate2 = sub2.getPendingMessageSize();
        message = (ActiveMQTextMessage)topic.getMessageStore().getMessage(message.getMessageId());
        message.setText("LargerMessageLargerMessage");
        topic.getMessageStore().updateMessage((Message)message);
        Assert.assertTrue((sub.getPendingMessageSize() > sizeBeforeUpdate + 10L ? 1 : 0) != 0);
        Assert.assertEquals((long)sub.getPendingMessageSize(), (long)topic.getMessageStore().getMessageSize());
        Assert.assertTrue((sub.getPendingMessageSize() > 2L * sub2.getPendingMessageSize() ? 1 : 0) != 0);
        Assert.assertEquals((long)sizeBeforeUpdate2, (long)sub2.getPendingMessageSize());
    }
}

