/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.virtual;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VirtualTopicFanoutPerfTest {
    private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicFanoutPerfTest.class);
    int numConsumers = 100;
    int total = 500;
    BrokerService brokerService;
    ConnectionFactory connectionFactory;

    @Before
    public void createBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.start();
        for (DestinationInterceptor destinationInterceptor : this.brokerService.getDestinationInterceptors()) {
            for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor)destinationInterceptor).getVirtualDestinations()) {
                if (!(virtualDestination instanceof VirtualTopic)) continue;
                ((VirtualTopic)virtualDestination).setConcurrentSend(true);
                ((VirtualTopic)virtualDestination).setTransactedSend(true);
            }
        }
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
        zeroPrefetch.setAll(0);
        activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
        this.connectionFactory = activeMQConnectionFactory;
    }

    @After
    public void stopBroker() throws Exception {
        this.brokerService.stop();
    }

    @Test
    @Ignore(value="comparison test - concurrentSend=true virtual topic, use transaction")
    public void testFanoutDuration() throws Exception {
        Connection connection1 = this.connectionFactory.createConnection();
        connection1.start();
        Session session = connection1.createSession(false, 1);
        for (int i = 0; i < this.numConsumers; ++i) {
            session.createConsumer((Destination)new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"));
        }
        Connection connection2 = this.connectionFactory.createConnection();
        connection2.start();
        Session producerSession = connection2.createSession(false, 1);
        MessageProducer producer = producerSession.createProducer((Destination)new ActiveMQTopic("VirtualTopic.TEST"));
        long start = System.currentTimeMillis();
        LOG.info("Starting producer: " + start);
        for (int i = 0; i < this.total; ++i) {
            producer.send((Message)producerSession.createTextMessage("message: " + i));
        }
        LOG.info("Done producer, duration: " + (System.currentTimeMillis() - start));
        try {
            connection1.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            connection2.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

