/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Destination;
import jakarta.jms.MessageConsumer;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreeBrokerQueueNetworkTest
extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ThreeBrokerQueueNetworkTest.class);
    protected static final int MESSAGE_COUNT = 100;
    private static final long MAX_WAIT_MILLIS = 10000L;

    public void testABandBCbrokerNetwork() throws Exception {
        this.bridgeBrokers("BrokerA", "BrokerB");
        this.bridgeBrokers("BrokerB", "BrokerC");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer clientC = this.createConsumer("BrokerC", (Destination)dest);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        Thread.sleep(1000L);
        MessageIdList msgsC = this.getConsumerMessages("BrokerC", clientC);
        ThreeBrokerQueueNetworkTest.assertEquals((int)0, (int)msgsC.getMessageCount());
    }

    public void testBAandBCbrokerNetwork() throws Exception {
        this.bridgeBrokers("BrokerB", "BrokerA");
        this.bridgeBrokers("BrokerB", "BrokerC");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest);
        MessageConsumer clientC = this.createConsumer("BrokerC", (Destination)dest);
        Thread.sleep(2000L);
        this.sendMessages("BrokerB", (Destination)dest, 100);
        Thread.sleep(1000L);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsC = this.getConsumerMessages("BrokerC", clientC);
        ThreeBrokerQueueNetworkTest.assertEquals((int)100, (int)(msgsA.getMessageCount() + msgsC.getMessageCount()));
    }

    public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws Exception {
        this.bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
        this.bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("broker", "BROKER_A");
        this.sendMessages("BrokerB", (Destination)dest, 100, props);
        props.clear();
        props.put("broker", "BROKER_C");
        this.sendMessages("BrokerB", (Destination)dest, 100, props);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest, "broker = 'BROKER_A'");
        MessageConsumer clientC = this.createConsumer("BrokerC", (Destination)dest, "broker = 'BROKER_C'");
        Thread.sleep(2000L);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsC = this.getConsumerMessages("BrokerC", clientC);
        ThreeBrokerQueueNetworkTest.assertEquals((int)100, (int)msgsA.getMessageCount());
        ThreeBrokerQueueNetworkTest.assertEquals((int)100, (int)msgsC.getMessageCount());
    }

    public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
        this.bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
        this.bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest, "broker = 'BROKER_A'");
        MessageConsumer clientC = this.createConsumer("BrokerC", (Destination)dest, "broker = 'BROKER_C'");
        Thread.sleep(2000L);
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("broker", "BROKER_A");
        this.sendMessages("BrokerB", (Destination)dest, 100, props);
        props.clear();
        props.put("broker", "BROKER_C");
        this.sendMessages("BrokerB", (Destination)dest, 100, props);
        Thread.sleep(1000L);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsC = this.getConsumerMessages("BrokerC", clientC);
        ThreeBrokerQueueNetworkTest.assertEquals((int)100, (int)msgsA.getMessageCount());
        ThreeBrokerQueueNetworkTest.assertEquals((int)100, (int)msgsC.getMessageCount());
    }

    public void testABandCBbrokerNetwork() throws Exception {
        this.bridgeBrokers("BrokerA", "BrokerB");
        this.bridgeBrokers("BrokerC", "BrokerB");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)dest);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        this.sendMessages("BrokerC", (Destination)dest, 100);
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", clientB);
        msgsB.waitForMessagesToArrive(200);
        ThreeBrokerQueueNetworkTest.assertEquals((int)200, (int)msgsB.getMessageCount());
    }

    public void testAllConnectedBrokerNetwork() throws Exception {
        this.bridgeBrokers("BrokerA", "BrokerB");
        this.bridgeBrokers("BrokerB", "BrokerA");
        this.bridgeBrokers("BrokerB", "BrokerC");
        this.bridgeBrokers("BrokerC", "BrokerB");
        this.bridgeBrokers("BrokerA", "BrokerC");
        this.bridgeBrokers("BrokerC", "BrokerA");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest);
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)dest);
        MessageConsumer clientC = this.createConsumer("BrokerC", (Destination)dest);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        this.sendMessages("BrokerB", (Destination)dest, 100);
        this.sendMessages("BrokerC", (Destination)dest, 100);
        Thread.sleep(1000L);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", clientB);
        MessageIdList msgsC = this.getConsumerMessages("BrokerC", clientC);
        ThreeBrokerQueueNetworkTest.assertEquals((int)300, (int)(msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()));
    }

    public void testAllConnectedUsingMulticast() throws Exception {
        this.bridgeAllBrokers();
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest);
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)dest);
        MessageConsumer clientC = this.createConsumer("BrokerC", (Destination)dest);
        this.sendMessages("BrokerA", (Destination)dest, 100);
        this.sendMessages("BrokerB", (Destination)dest, 100);
        this.sendMessages("BrokerC", (Destination)dest, 100);
        Thread.sleep(1000L);
        final MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", clientB);
        MessageIdList msgsC = this.getConsumerMessages("BrokerC", clientC);
        this.waitFor(new Condition(){

            @Override
            public boolean isSatisified() {
                return msgsA.getMessageCount() == 100;
            }
        });
        ThreeBrokerQueueNetworkTest.assertEquals((int)300, (int)(msgsA.getMessageCount() + msgsB.getMessageCount() + msgsC.getMessageCount()));
    }

    private void waitFor(Condition condition) throws Exception {
        long expiry = System.currentTimeMillis() + 10000L;
        while (!condition.isSatisified() && System.currentTimeMillis() < expiry) {
            Thread.sleep(1000L);
        }
        if (System.currentTimeMillis() >= expiry) {
            LOG.error("expired while waiting for condition " + String.valueOf(condition));
        }
    }

    public void testAllConnectedUsingMulticastProducerConsumerOnA() throws Exception {
        this.bridgeAllBrokers("default", 3, false);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        int messageCount = 2000;
        CountDownLatch messagesReceived = new CountDownLatch(messageCount);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest, messagesReceived);
        Thread.sleep(1000L);
        this.sendMessages("BrokerA", (Destination)dest, messageCount);
        ThreeBrokerQueueNetworkTest.assertTrue((boolean)messagesReceived.await(30L, TimeUnit.SECONDS));
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        ThreeBrokerQueueNetworkTest.assertEquals((int)messageCount, (int)msgsA.getMessageCount());
    }

    public void testAllConnectedWithSpare() throws Exception {
        this.bridgeAllBrokers("default", 3, false);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        int messageCount = 2000;
        CountDownLatch messagesReceived = new CountDownLatch(messageCount);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest, messagesReceived);
        Thread.sleep(2000L);
        this.sendMessages("BrokerB", (Destination)dest, messageCount);
        ThreeBrokerQueueNetworkTest.assertTrue((String)"messaged received within time limit", (boolean)messagesReceived.await(30L, TimeUnit.SECONDS));
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        ThreeBrokerQueueNetworkTest.assertEquals((int)messageCount, (int)msgsA.getMessageCount());
    }

    public void XtestMigrateConsumerStuckMessages() throws Exception {
        boolean suppressQueueDuplicateSubscriptions = false;
        this.bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        LOG.info("Consumer on A");
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest);
        Thread.sleep(2000L);
        LOG.info("Consumer on B");
        int messageCount = 2000;
        CountDownLatch messagesReceived = new CountDownLatch(messageCount / 2);
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)dest, messagesReceived);
        Thread.sleep(2000L);
        LOG.info("Close consumer on A");
        clientA.close();
        Thread.sleep(2000L);
        LOG.info("Send to B");
        this.sendMessages("BrokerB", (Destination)dest, messageCount);
        ThreeBrokerQueueNetworkTest.assertTrue((boolean)messagesReceived.await(30L, TimeUnit.SECONDS));
        MessageIdList msgs = this.getConsumerMessages("BrokerB", clientB);
        Thread.sleep(500L);
        ThreeBrokerQueueNetworkTest.assertEquals((int)(messageCount / 2), (int)msgs.getMessageCount());
        messagesReceived = new CountDownLatch(messageCount / 2);
        clientA = this.createConsumer("BrokerA", (Destination)dest, messagesReceived);
        ThreeBrokerQueueNetworkTest.assertTrue((boolean)messagesReceived.await(30L, TimeUnit.SECONDS));
        msgs = this.getConsumerMessages("BrokerA", clientA);
        ThreeBrokerQueueNetworkTest.assertEquals((int)(messageCount / 2), (int)msgs.getMessageCount());
    }

    public void testMigrateConsumer() throws Exception {
        boolean suppressQueueDuplicateSubscriptions = true;
        boolean decreaseNetworkConsumerPriority = true;
        this.bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        LOG.info("Consumer on A");
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)dest);
        Thread.sleep(2000L);
        LOG.info("Consumer on B");
        int messageCount = 2000;
        CountDownLatch messagesReceived = new CountDownLatch(messageCount);
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)dest, messagesReceived);
        MessageIdList msgs = this.getConsumerMessages("BrokerB", clientB);
        msgs.setProcessingDelay(10L);
        Thread.sleep(2000L);
        LOG.info("Close consumer on A");
        clientA.close();
        Thread.sleep(2000L);
        LOG.info("Send to B");
        this.sendMessages("BrokerB", (Destination)dest, messageCount);
        ThreeBrokerQueueNetworkTest.assertTrue((String)"messages are received within limit", (boolean)messagesReceived.await(60L, TimeUnit.SECONDS));
        ThreeBrokerQueueNetworkTest.assertEquals((int)messageCount, (int)msgs.getMessageCount());
    }

    public void testNoDuplicateQueueSubs() throws Exception {
        BrokerService broker;
        this.bridgeAllBrokers("default", 3, true);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        String brokerName = "BrokerA";
        MessageConsumer consumer = this.createConsumer(brokerName, (Destination)dest);
        Thread.sleep(2000L);
        Collection brokerList = this.brokers.values();
        Iterator i = brokerList.iterator();
        while (i.hasNext()) {
            broker = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            this.verifyConsumerCount(broker, 1, (Destination)dest);
        }
        consumer.close();
        Thread.sleep(2000L);
        i = brokerList.iterator();
        while (i.hasNext()) {
            broker = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            this.verifyConsumerCount(broker, 0, (Destination)dest);
        }
    }

    public void testNoDuplicateQueueSubsHasLowestPriority() throws Exception {
        boolean suppressQueueDuplicateSubscriptions = true;
        boolean decreaseNetworkConsumerPriority = true;
        this.bridgeAllBrokers("default", 3, suppressQueueDuplicateSubscriptions, decreaseNetworkConsumerPriority);
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        JmsMultipleBrokersTestSupport.BrokerItem brokerB = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get("BrokerA");
        brokerB.broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin(){

            public Broker installPlugin(Broker broker) throws Exception {
                return new BrokerFilter(broker){
                    final AtomicInteger count;
                    {
                        this.count = new AtomicInteger();
                    }

                    public void preProcessDispatch(MessageDispatch messageDispatch) {
                        if (messageDispatch.getDestination().getPhysicalName().contains("ActiveMQ.Advisory.Consumer") && this.count.getAndIncrement() == 0) {
                            LOG.info("Sleeping on first advisory: " + String.valueOf(messageDispatch));
                            try {
                                Thread.sleep(2000L);
                            }
                            catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        super.postProcessDispatch(messageDispatch);
                    }
                };
            }
        }});
        this.startAllBrokers();
        this.waitForBridgeFormation();
        String brokerName = "BrokerA";
        this.createConsumer(brokerName, (Destination)dest);
        Thread.sleep(5000L);
        Collection brokerList = this.brokers.values();
        Iterator i = brokerList.iterator();
        while (i.hasNext()) {
            BrokerService broker = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            this.verifyConsumerCount(broker, 1, (Destination)dest);
            if (brokerName.equals(broker.getBrokerName())) continue;
            this.verifyConsumePriority(broker, (byte)-5, (Destination)dest);
        }
    }

    public void testDuplicateQueueSubs() throws Exception {
        BrokerService broker;
        this.configureBroker(this.createBroker("BrokerD"));
        this.bridgeAllBrokers("default", 3, false);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        String brokerName = "BrokerA";
        MessageConsumer consumer = this.createConsumer(brokerName, (Destination)dest);
        Thread.sleep(2000L);
        this.verifyConsumerCount(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)brokerName)).broker, 1, (Destination)dest);
        Collection brokerList = this.brokers.values();
        Iterator i = brokerList.iterator();
        while (i.hasNext()) {
            broker = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            if (brokerName.equals(broker.getBrokerName())) continue;
            this.verifyConsumerCount(broker, 5, (Destination)dest);
            this.verifyConsumePriority(broker, (byte)0, (Destination)dest);
        }
        consumer.close();
        Thread.sleep(2000L);
        i = brokerList.iterator();
        while (i.hasNext()) {
            broker = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            if (brokerName.equals(broker.getBrokerName())) continue;
            this.logConsumerCount(broker, 0, (Destination)dest);
        }
        i = brokerList.iterator();
        while (i.hasNext()) {
            broker = ((JmsMultipleBrokersTestSupport.BrokerItem)i.next()).broker;
            this.verifyConsumerCount(broker, 0, (Destination)dest);
        }
    }

    private void verifyConsumerCount(BrokerService broker, int count, final Destination dest) throws Exception {
        final RegionBroker regionBroker = (RegionBroker)broker.getRegionBroker();
        this.waitFor(new Condition(){

            @Override
            public boolean isSatisified() throws Exception {
                return !regionBroker.getDestinations(ActiveMQDestination.transform((Destination)dest)).isEmpty();
            }
        });
        Queue internalQueue = (Queue)regionBroker.getDestinations(ActiveMQDestination.transform((Destination)dest)).iterator().next();
        LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count + ", val:" + internalQueue.getConsumers().size());
        ThreeBrokerQueueNetworkTest.assertEquals((String)("consumer count on " + broker.getBrokerName() + " matches for q: " + String.valueOf(internalQueue)), (int)count, (int)internalQueue.getConsumers().size());
    }

    private void logConsumerCount(BrokerService broker, int count, final Destination dest) throws Exception {
        final RegionBroker regionBroker = (RegionBroker)broker.getRegionBroker();
        this.waitFor(new Condition(){

            @Override
            public boolean isSatisified() throws Exception {
                return !regionBroker.getDestinations(ActiveMQDestination.transform((Destination)dest)).isEmpty();
            }
        });
        Queue internalQueue = (Queue)regionBroker.getDestinations(ActiveMQDestination.transform((Destination)dest)).iterator().next();
        LOG.info("Verify: consumer count on " + broker.getBrokerName() + " matches:" + count + ", val:" + internalQueue.getConsumers().size());
    }

    private void verifyConsumePriority(BrokerService broker, byte expectedPriority, Destination dest) throws Exception {
        RegionBroker regionBroker = (RegionBroker)broker.getRegionBroker();
        Queue internalQueue = (Queue)regionBroker.getDestinations(ActiveMQDestination.transform((Destination)dest)).iterator().next();
        for (Subscription consumer : internalQueue.getConsumers()) {
            ThreeBrokerQueueNetworkTest.assertEquals((String)("consumer on " + broker.getBrokerName() + " matches priority: " + String.valueOf(internalQueue)), (byte)expectedPriority, (byte)consumer.getConsumerInfo().getPriority());
        }
    }

    @Override
    public void configureBroker(BrokerService brokerService) {
        brokerService.setBrokerId(brokerService.getBrokerName());
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        this.createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
        this.createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
        this.createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
    }

    static interface Condition {
        public boolean isSatisified() throws Exception;
    }
}

