/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DuplexAdvisoryRaceTest {
    private static final Logger LOG = LoggerFactory.getLogger(DuplexAdvisoryRaceTest.class);
    private static String hostName;
    final AtomicLong responseReceived = new AtomicLong(0L);
    BrokerService brokerA;
    BrokerService brokerB;
    String networkConnectorUrlString;

    @BeforeClass
    public static void initIp() throws Exception {
        hostName = InetAddress.getLocalHost().getHostAddress();
    }

    @Before
    public void createBrokers() throws Exception {
        this.networkConnectorUrlString = "tcp://" + hostName + ":" + TestUtils.findOpenPort();
        this.brokerA = this.newBroker("A");
        this.brokerB = this.newBroker("B");
        this.responseReceived.set(0L);
    }

    @After
    public void stopBrokers() throws Exception {
        this.brokerA.stop();
        this.brokerB.stop();
    }

    public void repeatTestHang() throws Exception {
        for (int i = 0; i < 10; ++i) {
            this.testHang();
            this.stopBrokers();
            this.createBrokers();
        }
    }

    @Test
    public void testHang() throws Exception {
        this.brokerA.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
                Subscription subscription = super.addConsumer(context, info);
                if (context.isNetworkConnection()) {
                    TimeUnit.MILLISECONDS.sleep(100L);
                }
                return subscription;
            }
        }});
        NetworkConnector networkConnector = this.bridgeBrokers(this.brokerA, this.brokerB);
        this.brokerA.start();
        this.brokerB.start();
        ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(this.brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        ActiveMQConnectionFactory brokerBFactory = new ActiveMQConnectionFactory(this.brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        int numDests = 400;
        int numMessagesPerDest = 50;
        int numConsumersPerDest = 5;
        this.populate(brokerAFactory, 0, 200, 50);
        this.populate(brokerBFactory, 200, 400, 50);
        LinkedList<Connection> connections = new LinkedList<Connection>();
        connections.add(this.demand(brokerBFactory, 0, 200, 5));
        connections.add(this.demand(brokerAFactory, 200, 400, 5));
        LOG.info("Allow duplex bridge to connect....");
        this.brokerB.startTransportConnector(this.brokerB.addConnector(this.networkConnectorUrlString + "?transport.socketBufferSize=1024"));
        if (!Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("received: " + DuplexAdvisoryRaceTest.this.responseReceived.get());
                return DuplexAdvisoryRaceTest.this.responseReceived.get() >= 20000L;
            }
        }, (long)1800000L)) {
            TestSupport.dumpAllThreads((String)"DD");
            for (NetworkBridge networkBridge : networkConnector.activeBridges()) {
                if (!(networkBridge instanceof DemandForwardingBridge)) continue;
                DemandForwardingBridge demandForwardingBridge = (DemandForwardingBridge)networkBridge;
                Socket socket = (Socket)demandForwardingBridge.getRemoteBroker().narrow(Socket.class);
                socket.close();
            }
        }
        networkConnector.stop();
        for (Connection connection : connections) {
            try {
                connection.close();
            }
            catch (Exception exception) {}
        }
        Assert.assertTrue((String)("received all sent: " + this.responseReceived.get()), (this.responseReceived.get() >= 20000L ? 1 : 0) != 0);
    }

    private void populate(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numMessages) throws JMSException {
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        BytesMessage message = session.createBytesMessage();
        MessageProducer producer = session.createProducer(null);
        for (int i = minDest; i < maxDest; ++i) {
            Destination destination = this.qFromInt(i);
            for (int j = 0; j < numMessages; ++j) {
                producer.send(destination, (Message)message);
            }
        }
        connection.close();
    }

    private Connection demand(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numConsumers) throws Exception {
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, 1);
        for (int i = minDest; i < maxDest; ++i) {
            Destination destination = this.qFromInt(i);
            for (int j = 0; j < numConsumers; ++j) {
                session.createConsumer(destination).setMessageListener(new MessageListener(){

                    public void onMessage(Message message) {
                        DuplexAdvisoryRaceTest.this.responseReceived.incrementAndGet();
                    }
                });
            }
        }
        connection.start();
        return connection;
    }

    private Destination qFromInt(int val) {
        StringBuilder builder = new StringBuilder();
        String digits = String.format("%03d", val);
        for (int i = 0; i < 3; ++i) {
            builder.append(digits.charAt(i));
            if (i >= 2) continue;
            builder.append('.');
        }
        return new ActiveMQQueue("Test." + builder.toString());
    }

    private BrokerService newBroker(String name) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setBrokerName(name);
        brokerService.addConnector("tcp://" + hostName + ":0?transport.socketBufferSize=1024");
        PolicyMap map = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(0L);
        map.setDefaultEntry(defaultEntry);
        brokerService.setDestinationPolicy(map);
        return brokerService;
    }

    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
        String uri = "static:(failover:(" + this.networkConnectorUrlString + "?socketBufferSize=1024&trace=false)?maxReconnectAttempts=0)";
        DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
        connector.setName(localBroker.getBrokerName() + "-to-" + remoteBroker.getBrokerName());
        connector.setDuplex(true);
        localBroker.addNetworkConnector((NetworkConnector)connector);
        return connector;
    }
}

