/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Arrays;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdvisoryViaNetworkTest
extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(AdvisoryViaNetworkTest.class);

    @Override
    protected BrokerService createBroker(String brokerName) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.setBrokerName(brokerName);
        broker.addConnector(new URI("tcp://localhost:0"));
        this.brokers.put(brokerName, new JmsMultipleBrokersTestSupport.BrokerItem(broker));
        return broker;
    }

    public void testAdvisoryForwarding() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
        this.createBroker("A");
        this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        MessageConsumer consumerA = this.createConsumer("A", (Destination)advisoryTopic);
        MessageConsumer consumerB = this.createConsumer("B", (Destination)advisoryTopic);
        this.sendMessages("A", (Destination)new ActiveMQTopic("FOO"), 1);
        MessageIdList messagesA = this.getConsumerMessages("A", consumerA);
        MessageIdList messagesB = this.getConsumerMessages("B", consumerB);
        LOG.info("consumerA = " + messagesA);
        LOG.info("consumerB = " + messagesB);
        messagesA.assertMessagesReceived(2);
        messagesB.assertMessagesReceived(2);
    }

    public void testAdvisoryPrefetchSize() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
        this.createBroker("A");
        BrokerService brokerB = this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)topic1);
        networkBridge.setDuplex(true);
        networkBridge.setAdvisoryPrefetchSize(10);
        networkBridge.setPrefetchSize(1);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        this.createConsumer("A", (Destination)topic1);
        this.createConsumer("A", (Destination)new ActiveMQTopic("A.FOO2"));
        AdvisoryViaNetworkTest.assertEquals((int)10, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)advisoryTopic).getConsumers().get(0)).getPrefetchSize());
        AdvisoryViaNetworkTest.assertEquals((int)1, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)topic1).getConsumers().get(0)).getPrefetchSize());
        this.assertDeqInflight(0, 2);
    }

    public void testAdvisoryPrefetchSize1() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
        this.createBroker("A");
        BrokerService brokerB = this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)topic1);
        networkBridge.setDuplex(true);
        networkBridge.setAdvisoryPrefetchSize(1);
        networkBridge.setPrefetchSize(10);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        this.createConsumer("A", (Destination)topic1);
        this.createConsumer("A", (Destination)new ActiveMQTopic("A.FOO2"));
        AdvisoryViaNetworkTest.assertEquals((int)1, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)advisoryTopic).getConsumers().get(0)).getPrefetchSize());
        AdvisoryViaNetworkTest.assertEquals((int)10, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)topic1).getConsumers().get(0)).getPrefetchSize());
        this.assertDeqInflight(2, 0);
    }

    public void testAdvisoryPrefetchSizeNotSet() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
        this.createBroker("A");
        BrokerService brokerB = this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)topic1);
        networkBridge.setDuplex(true);
        networkBridge.setPrefetchSize(10);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        this.createConsumer("A", (Destination)topic1);
        this.createConsumer("A", (Destination)new ActiveMQTopic("A.FOO2"));
        AdvisoryViaNetworkTest.assertEquals((int)10, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)advisoryTopic).getConsumers().get(0)).getPrefetchSize());
        AdvisoryViaNetworkTest.assertEquals((int)10, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)topic1).getConsumers().get(0)).getPrefetchSize());
        this.assertDeqInflight(0, 2);
    }

    public void testPrefetchSize1() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
        ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
        this.createBroker("A");
        BrokerService brokerB = this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        networkBridge.setDuplex(true);
        networkBridge.setPrefetchSize(1);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        this.createConsumer("A", (Destination)topic1);
        this.createConsumer("A", (Destination)new ActiveMQTopic("A.FOO2"));
        AdvisoryViaNetworkTest.assertEquals((int)1, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)advisoryTopic).getConsumers().get(0)).getPrefetchSize());
        AdvisoryViaNetworkTest.assertEquals((int)1, (int)((Subscription)brokerB.getDestination((ActiveMQDestination)topic1).getConsumers().get(0)).getPrefetchSize());
        this.assertDeqInflight(2, 0);
    }

    public void testAdvisoryPrefetchSizePercent() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
        this.createBroker("A");
        this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        networkBridge.setDuplex(true);
        networkBridge.setAdvisoryPrefetchSize(10);
        networkBridge.setAdvisoryAckPercentage(65);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        for (int i = 0; i < 10; ++i) {
            this.createConsumer("A", (Destination)new ActiveMQTopic("A.FOO"));
        }
        this.assertDeqInflight(7, 3);
    }

    public void testPrefetchSizePercent() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
        this.createBroker("A");
        this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        networkBridge.setDuplex(true);
        networkBridge.setPrefetchSize(10);
        networkBridge.setAdvisoryAckPercentage(65);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        for (int i = 0; i < 10; ++i) {
            this.createConsumer("A", (Destination)new ActiveMQTopic("A.FOO"));
        }
        this.assertDeqInflight(7, 3);
    }

    private void assertDeqInflight(final int dequeue, final int inflight) throws Exception {
        AdvisoryViaNetworkTest.assertTrue((String)"deq and inflight as expected", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                RegionBroker regionBroker = (RegionBroker)((JmsMultipleBrokersTestSupport.BrokerItem)((AdvisoryViaNetworkTest)AdvisoryViaNetworkTest.this).brokers.get((Object)"A")).broker.getRegionBroker();
                LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
                LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
                return regionBroker.getDestinationStatistics().getDequeues().getCount() == (long)dequeue && regionBroker.getDestinationStatistics().getInflight().getCount() == (long)inflight;
            }
        }));
    }

    public void testAdvisoryForwardingDuplexNC() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");
        this.createBroker("A");
        this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        networkBridge.setDuplex(true);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        MessageConsumer consumerA = this.createConsumer("A", (Destination)advisoryTopic);
        MessageConsumer consumerB = this.createConsumer("B", (Destination)advisoryTopic);
        this.sendMessages("A", (Destination)new ActiveMQTopic("FOO"), 1);
        MessageIdList messagesA = this.getConsumerMessages("A", consumerA);
        MessageIdList messagesB = this.getConsumerMessages("B", consumerB);
        LOG.info("consumerA = " + messagesA);
        LOG.info("consumerB = " + messagesB);
        messagesA.assertMessagesReceived(2);
        messagesB.assertMessagesReceived(2);
    }

    public void testBridgeRelevantAdvisoryNotAvailable() throws Exception {
        ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.FOO");
        this.createBroker("A");
        this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.addStaticallyIncludedDestination((ActiveMQDestination)advisoryTopic);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        MessageConsumer consumerA = this.createConsumer("A", (Destination)advisoryTopic);
        MessageConsumer consumerB = this.createConsumer("B", (Destination)advisoryTopic);
        this.createConsumer("A", (Destination)new ActiveMQTopic("FOO"));
        MessageIdList messagesA = this.getConsumerMessages("A", consumerA);
        MessageIdList messagesB = this.getConsumerMessages("B", consumerB);
        LOG.info("consumerA = " + messagesA);
        LOG.info("consumerB = " + messagesB);
        messagesA.assertMessagesReceived(1);
        messagesB.assertMessagesReceived(0);
    }

    public void testAdvisoryViaVirtualDest() throws Exception {
        ActiveMQQueue advisoryQueue = new ActiveMQQueue("advQ");
        this.createBroker("A");
        CompositeTopic compositeTopic = new CompositeTopic();
        compositeTopic.setName("ActiveMQ.Advisory.Connection");
        compositeTopic.setForwardOnly(false);
        compositeTopic.setForwardTo(Arrays.asList(advisoryQueue));
        VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{compositeTopic});
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"A")).broker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
        this.createBroker("B");
        NetworkConnector networkBridge = this.bridgeBrokers("A", "B");
        networkBridge.setDuplex(true);
        networkBridge.setPrefetchSize(1);
        this.startAllBrokers();
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("A"), 1);
        this.verifyPeerBrokerInfo((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("B"), 1);
        MessageConsumer consumerB = this.createConsumer("B", (Destination)advisoryQueue);
        this.createConsumer("A", (Destination)new ActiveMQTopic("FOO"));
        MessageIdList messagesB = this.getConsumerMessages("B", consumerB);
        messagesB.waitForMessagesToArrive(2);
        AdvisoryViaNetworkTest.assertTrue((String)"deq and inflight as expected", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                RegionBroker regionBroker = (RegionBroker)((JmsMultipleBrokersTestSupport.BrokerItem)((AdvisoryViaNetworkTest)AdvisoryViaNetworkTest.this).brokers.get((Object)"A")).broker.getRegionBroker();
                LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
                LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
                return regionBroker.getDestinationStatistics().getDequeues().getCount() > 2L && regionBroker.getDestinationStatistics().getInflight().getCount() == 0L;
            }
        }));
    }

    private void verifyPeerBrokerInfo(JmsMultipleBrokersTestSupport.BrokerItem brokerItem, final int max) throws Exception {
        final BrokerService broker = brokerItem.broker;
        final RegionBroker regionBroker = (RegionBroker)broker.getRegionBroker();
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
                return max == regionBroker.getPeerBrokerInfos().length;
            }
        }, (long)120000L);
        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
        for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
            LOG.info(info.getBrokerName());
        }
        AdvisoryViaNetworkTest.assertEquals((String)broker.getBrokerName(), (int)max, (int)regionBroker.getPeerBrokerInfos().length);
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
    }
}

