/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.commons.lang.ArrayUtils;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTTNetworkOfBrokersFailoverTest
extends NetworkTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
    private int localBrokerMQTTPort = -1;
    private int remoteBrokerMQTTPort = -1;

    @Override
    protected void setUp() throws Exception {
        this.useJmx = true;
        super.setUp();
        URI ncUri = new URI("static:(" + this.connector.getConnectUri().toString() + ")");
        DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(ncUri);
        nc.setDuplex(true);
        this.remoteBroker.addNetworkConnector((NetworkConnector)nc);
        nc.start();
        MQTTNetworkOfBrokersFailoverTest.assertFalse((this.localBrokerMQTTPort == -1 ? 1 : 0) != 0);
        MQTTNetworkOfBrokersFailoverTest.assertFalse((this.remoteBrokerMQTTPort == -1 ? 1 : 0) != 0);
    }

    @Override
    protected void tearDown() throws Exception {
        if (this.remoteBroker.isStarted()) {
            this.remoteBroker.stop();
            this.remoteBroker.waitUntilStopped();
        }
        if (this.broker.isStarted()) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        super.tearDown();
    }

    @Test
    public void testNoStaleSubscriptionAcrossNetwork() throws Exception {
        CountDownLatch consumerNetworked = this.listenForConsumersOn(this.broker);
        MQTT remoteMqtt = this.createMQTTTcpConnection("foo", false, this.remoteBrokerMQTTPort);
        BlockingConnection remoteConn = remoteMqtt.blockingConnection();
        remoteConn.connect();
        remoteConn.subscribe(new org.fusesource.mqtt.client.Topic[]{new org.fusesource.mqtt.client.Topic("foo/bar", QoS.AT_LEAST_ONCE)});
        MQTTNetworkOfBrokersFailoverTest.assertTrue((String)"No destination detected!", (boolean)consumerNetworked.await(1L, TimeUnit.SECONDS));
        this.assertQueueExistsOn(this.remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
        this.assertQueueExistsOn(this.broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
        remoteConn.disconnect();
        MQTT localMqtt = this.createMQTTTcpConnection("foo", false, this.localBrokerMQTTPort);
        BlockingConnection localConn = localMqtt.blockingConnection();
        localConn.connect();
        localConn.subscribe(new org.fusesource.mqtt.client.Topic[]{new org.fusesource.mqtt.client.Topic("foo/bar", QoS.AT_LEAST_ONCE)});
        remoteConn = remoteMqtt.blockingConnection();
        remoteConn.connect();
        remoteConn.publish("foo/bar", "Hello, World!".getBytes(), QoS.AT_LEAST_ONCE, false);
        Message msg = localConn.receive(100L, TimeUnit.SECONDS);
        MQTTNetworkOfBrokersFailoverTest.assertNotNull((Object)msg);
        msg.ack();
        String response = new String(msg.getPayload());
        MQTTNetworkOfBrokersFailoverTest.assertEquals((String)"Hello, World!", (String)response);
        MQTTNetworkOfBrokersFailoverTest.assertEquals((String)"foo/bar", (String)msg.getTopic());
        remoteConn.subscribe(new org.fusesource.mqtt.client.Topic[]{new org.fusesource.mqtt.client.Topic("foo/bar", QoS.AT_LEAST_ONCE)});
        msg = remoteConn.receive(500L, TimeUnit.MILLISECONDS);
        MQTTNetworkOfBrokersFailoverTest.assertNull((String)"We have duplicate messages across the cluster for a distributed topic", (Object)msg);
    }

    private CountDownLatch listenForConsumersOn(BrokerService broker) throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        URI brokerUri = broker.getVmConnectorURI();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
        final Connection connection = cf.createConnection();
        connection.start();
        final Session session = connection.createSession(false, 1);
        Topic dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
        MessageConsumer consumer = session.createConsumer((Destination)dest);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(javax.jms.Message message) {
                latch.countDown();
                Dispatch.getGlobalQueue().execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            session.close();
                            connection.close();
                        }
                        catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        });
        return latch;
    }

    private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception {
        BrokerView brokerView = broker.getAdminView();
        ObjectName[] queueNames = brokerView.getQueues();
        MQTTNetworkOfBrokersFailoverTest.assertEquals((int)1, (int)queueNames.length);
        MQTTNetworkOfBrokersFailoverTest.assertTrue((boolean)queueNames[0].toString().contains(queueName));
    }

    private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
        BrokerView brokerView = broker.getAdminView();
        Object[] activeDurableSubs = brokerView.getDurableTopicSubscribers();
        Object[] inactiveDurableSubs = brokerView.getInactiveDurableTopicSubscribers();
        ObjectName[] allDurables = (ObjectName[])ArrayUtils.addAll((Object[])activeDurableSubs, (Object[])inactiveDurableSubs);
        MQTTNetworkOfBrokersFailoverTest.assertEquals((int)1, (int)allDurables.length);
        DurableSubscriptionViewMBean durableSubView = (DurableSubscriptionViewMBean)broker.getManagementContext().newProxyInstance(allDurables[0], DurableSubscriptionViewMBean.class, true);
        MQTTNetworkOfBrokersFailoverTest.assertEquals((String)subName, (String)durableSubView.getClientId());
    }

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = super.createBroker();
        broker.setPersistent(true);
        broker.setBrokerName("local");
        broker.setDataDirectory("target/activemq-data");
        broker.setDeleteAllMessagesOnStartup(true);
        TransportConnector tc = broker.addConnector(this.getDefaultMQTTTransportConnectorUri());
        this.localBrokerMQTTPort = tc.getConnectUri().getPort();
        return broker;
    }

    @Override
    protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception {
        BrokerService broker = super.createRemoteBroker(persistenceAdapter);
        broker.setPersistent(true);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setDataDirectory("target/activemq-data");
        TransportConnector tc = broker.addConnector(this.getDefaultMQTTTransportConnectorUri());
        this.remoteBrokerMQTTPort = tc.getConnectUri().getPort();
        return broker;
    }

    private String getDefaultMQTTTransportConnectorUri() {
        return "mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
    }

    private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setConnectAttemptsMax(1L);
        mqtt.setReconnectAttemptsMax(0L);
        mqtt.setTracer(this.createTracer());
        if (clientId != null) {
            mqtt.setClientId(clientId);
        }
        mqtt.setCleanSession(clean);
        mqtt.setHost("localhost", port);
        return mqtt;
    }

    protected Tracer createTracer() {
        return new Tracer(){

            public void onReceive(MQTTFrame frame) {
                LOG.info("Client Received:\n" + frame);
            }

            public void onSend(MQTTFrame frame) {
                LOG.info("Client Sent:\n" + frame);
            }

            public void debug(String message, Object ... args) {
                LOG.info(String.format(message, args));
            }
        };
    }
}

