/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.TestUtils;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DuplexStartNpeTest {
    private static final Logger LOG = LoggerFactory.getLogger(DuplexStartNpeTest.class);
    final ActiveMQQueue dest = new ActiveMQQueue("QQ");
    final List<BrokerService> brokerServices = new ArrayList<BrokerService>();
    final List<Connection> connections = new ArrayList<Connection>();
    static final String urlString = "tcp://localhost:" + TestUtils.findOpenPort();
    static final int NUM_MESSAGES = 10;

    @Test
    public void reproduceNpe() throws Exception {
        BrokerService broker0 = this.createBroker();
        NetworkConnector networkConnector = broker0.addNetworkConnector("masterslave:(" + urlString + "," + urlString + ")");
        networkConnector.setDuplex(true);
        networkConnector.setStaticBridge(true);
        networkConnector.setStaticallyIncludedDestinations(Arrays.asList(this.dest));
        broker0.start();
        this.publish(broker0.getVmConnectorURI());
        BrokerService broker1 = this.createBroker();
        broker1.addConnector(urlString);
        broker1.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
                super.addConnection(context, info);
                if (info.getClientId() != null && info.getClientId().contains("_duplex_")) {
                    LOG.info("New connection for broker1: " + info);
                    TimeUnit.MILLISECONDS.sleep(500L);
                }
            }
        }});
        broker1.start();
        this.consume(new URI(urlString));
    }

    private void consume(URI uri) throws Exception {
        MessageConsumer messageConsumer = this.connectionFactory(uri).createConnection().createSession(false, 1).createConsumer((Destination)this.dest);
        for (int i = 0; i < 10; ++i) {
            TestCase.assertNotNull((String)("got message: " + i), (Object)messageConsumer.receive(5000L));
        }
    }

    private void publish(URI uri) throws Exception {
        MessageProducer messageProducer = this.connectionFactory(uri).createConnection().createSession(false, 1).createProducer((Destination)this.dest);
        for (int i = 0; i < 10; ++i) {
            messageProducer.send((Message)new ActiveMQTextMessage());
        }
    }

    private ActiveMQConnectionFactory connectionFactory(URI uri) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri){

            public Connection createConnection() throws JMSException {
                Connection connection = super.createConnection();
                DuplexStartNpeTest.this.connections.add(connection);
                connection.start();
                return connection;
            }
        };
        connectionFactory.setWatchTopicAdvisories(false);
        return connectionFactory;
    }

    private BrokerService createBroker() {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("B" + this.brokerServices.size());
        brokerService.setBrokerId(brokerService.getBrokerName());
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setAdvisorySupport(false);
        this.brokerServices.add(brokerService);
        return brokerService;
    }

    @After
    public void tearDown() throws Exception {
        for (Connection connection : this.connections) {
            try {
                connection.close();
            }
            catch (Exception exception) {}
        }
        this.connections.clear();
        for (BrokerService brokerService : this.brokerServices) {
            try {
                brokerService.stop();
            }
            catch (Exception exception) {}
        }
        this.brokerServices.clear();
    }
}

