/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.store.AbstractStoreStatTestSupport;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMessageStoreSizeStatTest
extends AbstractStoreStatTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractMessageStoreSizeStatTest.class);
    protected BrokerService broker;
    protected URI brokerConnectURI;
    protected String defaultQueueName = "test.queue";
    protected String defaultTopicName = "test.topic";

    @Before
    public void startBroker() throws Exception {
        this.setUpBroker(true);
    }

    protected void setUpBroker(boolean clearDataDir) throws Exception {
        this.broker = new BrokerService();
        this.initPersistence(this.broker);
        TransportConnector connector = this.broker.addConnector(new TransportConnector());
        connector.setUri(new URI("tcp://0.0.0.0:0"));
        connector.setName("tcp");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Override
    protected BrokerService getBroker() {
        return this.broker;
    }

    @Override
    protected URI getBrokerConnectURI() {
        return this.brokerConnectURI;
    }

    protected abstract void initPersistence(BrokerService var1) throws IOException;

    @Test(timeout=60000L)
    public void testMessageSize() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Destination dest = this.publishTestQueueMessages(200, publishedMessageSize);
        this.verifyStats(dest, 200, publishedMessageSize.get());
    }

    @Test(timeout=60000L)
    public void testMessageSizeAfterConsumption() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Destination dest = this.publishTestQueueMessages(200, publishedMessageSize);
        this.verifyStats(dest, 200, publishedMessageSize.get());
        this.consumeTestQueueMessages();
        this.verifyStats(dest, 0, 0L);
    }

    @Test(timeout=60000L)
    public void testMessageSizeOneDurable() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Destination dest = this.publishTestMessagesDurable(connection, new String[]{"sub1"}, 200, 200, publishedMessageSize);
        this.verifyStats(dest, 200, publishedMessageSize.get());
        this.consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
        this.verifyStats(dest, 0, 0L);
        connection.close();
    }

    @Test(timeout=60000L)
    public void testMessageSizeTwoDurables() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Connection connection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        connection.setClientID("clientId");
        connection.start();
        Destination dest = this.publishTestMessagesDurable(connection, new String[]{"sub1", "sub2"}, 200, 200, publishedMessageSize);
        this.verifyStats(dest, 200, publishedMessageSize.get());
        this.consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
        this.verifyStats(dest, 200, publishedMessageSize.get());
        connection.stop();
    }

    @Test
    public void testMessageSizeAfterDestinationDeletion() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Destination dest = this.publishTestQueueMessages(200, publishedMessageSize);
        this.verifyStats(dest, 200, publishedMessageSize.get());
        this.broker.removeDestination(dest.getActiveMQDestination());
        this.verifyStats(dest, 0, 0L);
    }

    @Test
    public void testQueueBrowserMessageSize() throws Exception {
        AtomicLong publishedMessageSize = new AtomicLong();
        Destination dest = this.publishTestQueueMessages(200, publishedMessageSize);
        this.browseTestQueueMessages(dest.getName());
        this.verifyStats(dest, 200, publishedMessageSize.get());
    }

    protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception {
        final MessageStore messageStore = dest.getMessageStore();
        final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
        Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return count == messageStore.getMessageCount() && (long)messageStore.getMessageCount() == storeStats.getMessageCount().getCount() && messageStore.getMessageSize() == messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize();
            }
        }));
        if (count > 0) {
            Assert.assertTrue((storeStats.getMessageSize().getTotalSize() > minimumSize ? 1 : 0) != 0);
            Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return storeStats.getMessageSize().getTotalSize() > minimumSize;
                }
            }));
        } else {
            Assert.assertTrue((boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return storeStats.getMessageSize().getTotalSize() == 0L;
                }
            }));
        }
    }

    protected Destination publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception {
        return this.publishTestQueueMessages(count, this.defaultQueueName, 2, AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
    }

    protected Destination publishTestQueueMessages(int count, String queueName, AtomicLong publishedMessageSize) throws Exception {
        return this.publishTestQueueMessages(count, queueName, 2, AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize);
    }

    protected Destination consumeTestQueueMessages() throws Exception {
        return this.consumeTestQueueMessages(this.defaultQueueName);
    }

    protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, AtomicLong publishedMessageSize) throws Exception {
        return this.consumeDurableTestMessages(connection, sub, size, this.defaultTopicName, publishedMessageSize);
    }

    protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception {
        return this.publishTestMessagesDurable(connection, subNames, this.defaultTopicName, publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize, true);
    }
}

